Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk): Support Multi-Topic Pipeline Configuration #947

Merged
merged 1 commit into from
Sep 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion app-service-template/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjh
github.com/hashicorp/go-msgpack v0.5.3 h1:zKjpN5BK/P5lMYrLmBHdBULWbJ0XpYR+7NGzqkZzoD4=
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI=
github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/go-rootcerts v1.0.2 h1:jzhAVGtqPKbwpyCPELlgNWhE1znq+qwJtW5Oi2viEzc=
github.com/hashicorp/go-rootcerts v1.0.2/go.mod h1:pqUvnprVnM5bf7AOirdbb01K4ccR319Vf4pU3K5EGc8=
github.com/hashicorp/go-sockaddr v1.0.0 h1:GeH6tui99pF4NJgfnhp+L6+FfobzVW3Ah46sLo0ICXs=
Expand Down
10 changes: 5 additions & 5 deletions app-service-template/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ func (app *myApp) CreateAndRunAppService(serviceKey string, newServiceFactory fu
sample := functions.NewSample()

// TODO: Replace below functions with built in and/or your custom functions for your use case
// or remove is using Pipeline By Topic below.
// or remove if using Pipeline By Topics below.
// See https://docs.edgexfoundry.org/2.0/microservices/application/BuiltIn/ for list of built-in functions
err = app.service.SetFunctionsPipeline(
err = app.service.SetDefaultFunctionsPipeline(
transforms.NewFilterFor(deviceNames).FilterByDeviceName,
sample.LogEventDetails,
sample.ConvertEventToXML,
Expand All @@ -120,8 +120,8 @@ func (app *myApp) CreateAndRunAppService(serviceKey string, newServiceFactory fu
// Core Data publishes to the 'edgex/events/core/<profile-name>/<device-name>/<source-name>' topic
// Note: This example with default above causes Events from Random-Float-Device device to be processed twice
// resulting in the XML to be published back to the MessageBus twice.
// See <Pipeline By Topic documentation url TBD> for more details.
err = app.service.AddFunctionsPipelineForTopic("Floats", "edgex/events/#/#/Random-Float-Device/#",
// See https://docs.edgexfoundry.org/2.0/microservices/application/AdvancedTopics/#pipeline-per-topics for more details.
err = app.service.AddFunctionsPipelineForTopics("Floats", []string{"edgex/events/#/#/Random-Float-Device/#"},
transforms.NewFilterFor(deviceNames).FilterByDeviceName,
sample.LogEventDetails,
sample.ConvertEventToXML,
Expand All @@ -132,7 +132,7 @@ func (app *myApp) CreateAndRunAppService(serviceKey string, newServiceFactory fu
}
// Note: This example with default above causes Events from Int32 source to be processed twice
// resulting in the XML to be published back to the MessageBus twice.
err = app.service.AddFunctionsPipelineForTopic("Int32s", "edgex/events/#/#/#/Int32",
err = app.service.AddFunctionsPipelineForTopics("Int32s", []string{"edgex/events/#/#/#/Int32"},
transforms.NewFilterFor(deviceNames).FilterByDeviceName,
sample.LogEventDetails,
sample.ConvertEventToXML,
Expand Down
10 changes: 5 additions & 5 deletions app-service-template/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ func TestCreateAndRunService_Success(t *testing.T) {
mockAppService.On("LoggingClient").Return(logger.NewMockClient())
mockAppService.On("GetAppSettingStrings", "DeviceNames").
Return([]string{"Random-Boolean-Device, Random-Integer-Device"}, nil)
mockAppService.On("SetFunctionsPipeline", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
mockAppService.On("SetDefaultFunctionsPipeline", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(nil)
mockAppService.On("AddFunctionsPipelineForTopic", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
mockAppService.On("AddFunctionsPipelineForTopics", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(nil)
mockAppService.On("LoadCustomConfig", mock.Anything, mock.Anything, mock.Anything).
Return(nil).Run(func(args mock.Arguments) {
Expand Down Expand Up @@ -115,7 +115,7 @@ func TestCreateAndRunService_SetFunctionsPipeline_Failed(t *testing.T) {
})
mockAppService.On("ListenForCustomConfigChanges", mock.Anything, mock.Anything, mock.Anything).
Return(nil)
mockAppService.On("SetFunctionsPipeline", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
mockAppService.On("SetDefaultFunctionsPipeline", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(fmt.Errorf("Failed")).Run(func(args mock.Arguments) {
setFunctionsPipelineCalled = true
})
Expand Down Expand Up @@ -148,9 +148,9 @@ func TestCreateAndRunService_MakeItRun_Failed(t *testing.T) {
})
mockAppService.On("ListenForCustomConfigChanges", mock.Anything, mock.Anything, mock.Anything).
Return(nil)
mockAppService.On("SetFunctionsPipeline", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
mockAppService.On("SetDefaultFunctionsPipeline", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(nil)
mockAppService.On("AddFunctionsPipelineForTopic", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
mockAppService.On("AddFunctionsPipelineForTopics", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(nil)
mockAppService.On("MakeItRun").Return(fmt.Errorf("Failed")).Run(func(args mock.Arguments) {
makeItRunCalled = true
Expand Down
25 changes: 13 additions & 12 deletions internal/app/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (svc *Service) LoadConfigurableFunctionPipelines() (map[string]interfaces.F
pipeline := interfaces.FunctionPipeline{
Id: interfaces.DefaultPipelineId,
Transforms: transforms,
Topic: runtime.TopicWildCard,
Topics: []string{runtime.TopicWildCard},
}
pipelines[pipeline.Id] = pipeline
}
Expand All @@ -289,7 +289,7 @@ func (svc *Service) LoadConfigurableFunctionPipelines() (map[string]interfaces.F
pipeline := interfaces.FunctionPipeline{
Id: perTopicPipeline.Id,
Transforms: transforms,
Topic: perTopicPipeline.Topic,
Topics: util.DeleteEmptyAndTrim(strings.FieldsFunc(perTopicPipeline.Topics, util.SplitComma)),
}

pipelines[pipeline.Id] = pipeline
Expand Down Expand Up @@ -384,26 +384,27 @@ func (svc *Service) SetDefaultFunctionsPipeline(transforms ...interfaces.AppFunc
return nil
}

// AddFunctionsPipelineForTopic adds a functions pipeline for the specified for the specified id and topic
func (svc *Service) AddFunctionsPipelineForTopic(id string, topic string, transforms ...interfaces.AppFunction) error {
if strings.ToUpper(svc.config.Trigger.Type) == TriggerTypeHTTP {
return errors.New("pipeline per topic not valid with HTTP trigger")
}

// AddFunctionsPipelineForTopics adds a functions pipeline for the specified for the specified id and topics
func (svc *Service) AddFunctionsPipelineForTopics(id string, topics []string, transforms ...interfaces.AppFunction) error {
if len(transforms) == 0 {
return errors.New("no transforms provided to pipeline")
}

if len(strings.TrimSpace(topic)) == 0 {
return errors.New("topic for pipeline can not be blank")
if len(topics) == 0 {
return errors.New("topics for pipeline can not be empty")
}

err := svc.runtime.AddFunctionsPipeline(id, topic, transforms)
for _, t := range topics {
if strings.TrimSpace(t) == "" {
return errors.New("blank topic not allowed")
}
}
err := svc.runtime.AddFunctionsPipeline(id, topics, transforms)
if err != nil {
return err
}

svc.lc.Debugf("Pipeline '%s' added for topic '%s' with %d transform(s)", id, topic, len(transforms))
svc.lc.Debugf("Pipeline '%s' added for topics '%v' with %d transform(s)", id, topics, len(transforms))
return nil
}

Expand Down
27 changes: 14 additions & 13 deletions internal/app/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func TestSetDefaultFunctionsPipelineOneTransform(t *testing.T) {
require.NoError(t, err)
}

func TestService_AddFunctionsPipelineForTopic(t *testing.T) {
func TestService_AddFunctionsPipelineForTopics(t *testing.T) {
service := Service{
lc: lc,
runtime: runtime.NewGolangRuntime("", nil, nil),
Expand All @@ -313,32 +313,33 @@ func TestService_AddFunctionsPipelineForTopic(t *testing.T) {

transforms := []interfaces.AppFunction{tags.AddTags}

// This sets the Default Pipeline allowing to test for duplicate iD.
err := service.SetDefaultFunctionsPipeline(transforms...)

require.NoError(t, err)

defaultTopics := []string{"#"}

tests := []struct {
name string
id string
trigger string
topic string
topics []string
transforms []interfaces.AppFunction
expectError bool
}{
{"Happy Path", "123", TriggerTypeMessageBus, "#", transforms, false},
{"Happy Path Custom", "124", "CUSTOM TRIGGER", "#", transforms, false},
{"Empty Topic", "125", TriggerTypeMessageBus, " ", transforms, true},
{"No Transforms", "126", TriggerTypeMessageBus, "#", nil, true},
{"Default Id", interfaces.DefaultPipelineId, TriggerTypeMessageBus, "#", transforms, true},
{"Duplicate Id", "123", TriggerTypeMessageBus, "#", transforms, true},
{"HTTP Trigger Type", "127", TriggerTypeHTTP, "#", transforms, true},
{"Happy Path", "123", TriggerTypeMessageBus, defaultTopics, transforms, false},
{"Duplicate Id", interfaces.DefaultPipelineId, TriggerTypeMessageBus, defaultTopics, transforms, true},
{"Happy Path Custom", "124", "CUSTOM TRIGGER", defaultTopics, transforms, false},
{"Empty Topic", "125", TriggerTypeMessageBus, []string{" "}, transforms, true},
{"Empty Topics", "126", TriggerTypeMessageBus, nil, transforms, true},
{"No Transforms", "127", TriggerTypeMessageBus, defaultTopics, nil, true},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
service.config.Trigger.Type = test.trigger

err := service.AddFunctionsPipelineForTopic(test.id, test.topic, test.transforms...)
err := service.AddFunctionsPipelineForTopics(test.id, test.topics, test.transforms...)
if test.expectError {
require.Error(t, err)
return
Expand Down Expand Up @@ -480,7 +481,7 @@ func TestLoadConfigurableFunctionPipelinesDefaultNotFound(t *testing.T) {
if len(test.perTopicExecutionOrder) > 0 {
service.config.Writable.Pipeline.PerTopicPipelines["bogus"] = common.TopicPipeline{
Id: "bogus",
Topic: "#",
Topics: "#",
ExecutionOrder: test.perTopicExecutionOrder,
}
}
Expand Down Expand Up @@ -539,7 +540,7 @@ func TestLoadConfigurableFunctionPipelinesNumFunctions(t *testing.T) {
PerTopicPipelines: map[string]common.TopicPipeline{
perTopicPipelineId: {
Id: perTopicPipelineId,
Topic: "#",
Topics: "#",
ExecutionOrder: "FilterByDeviceName, Transform, SetResponseData",
},
},
Expand Down
6 changes: 3 additions & 3 deletions internal/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,12 @@ type PipelineInfo struct {
Functions map[string]PipelineFunction
}

// TopicPipeline define the data to a Pre Topic functions pipeline
// TopicPipeline define the data to a Per Topics functions pipeline
type TopicPipeline struct {
// Id is the unique ID of the pipeline instance
Id string
// Topic is the topic which must match the incoming topic inorder for the pipeline to execute
Topic string
// Topics is the set of comma separated topics matched against the incoming to determine if pipeline should execute
Topics string
// ExecutionOrder is a list of functions, in execution order, for the pipeline instance
ExecutionOrder string
}
Expand Down
62 changes: 35 additions & 27 deletions internal/runtime/runtime.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//
// Copyright (c) 2021 Intel Corporation
// Copyright (c) 2021 One Track Consulting
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -47,11 +48,11 @@ const (
TopicLevelSeparator = "/"
)

func NewFunctionPipeline(id string, topic string, transforms []interfaces.AppFunction) interfaces.FunctionPipeline {
func NewFunctionPipeline(id string, topics []string, transforms []interfaces.AppFunction) interfaces.FunctionPipeline {
pipeline := interfaces.FunctionPipeline{
Id: id,
Transforms: transforms,
Topic: topic,
Topics: topics,
Hash: calculatePipelineHash(transforms),
}

Expand Down Expand Up @@ -99,17 +100,17 @@ func (gr *GolangRuntime) SetDefaultFunctionsPipeline(transforms []interfaces.App
return nil
}

return gr.AddFunctionsPipeline(interfaces.DefaultPipelineId, TopicWildCard, transforms)
return gr.AddFunctionsPipeline(interfaces.DefaultPipelineId, []string{TopicWildCard}, transforms)
}

// AddFunctionsPipeline is thread safe to set transforms
func (gr *GolangRuntime) AddFunctionsPipeline(id string, topic string, transforms []interfaces.AppFunction) error {
func (gr *GolangRuntime) AddFunctionsPipeline(id string, topics []string, transforms []interfaces.AppFunction) error {
_, exists := gr.pipelines[id]
if exists {
return fmt.Errorf("pipeline with Id='%s' already exists", id)
}

pipeline := NewFunctionPipeline(id, topic, transforms)
pipeline := NewFunctionPipeline(id, topics, transforms)
gr.isBusyCopying.Lock()
gr.pipelines[id] = &pipeline
gr.isBusyCopying.Unlock()
Expand Down Expand Up @@ -206,7 +207,7 @@ func (gr *GolangRuntime) ProcessMessage(
execPipeline := &interfaces.FunctionPipeline{
Id: pipeline.Id,
Transforms: make([]interfaces.AppFunction, len(pipeline.Transforms)),
Topic: pipeline.Topic,
Topics: pipeline.Topics,
Hash: pipeline.Hash,
}
copy(execPipeline.Transforms, pipeline.Transforms)
Expand Down Expand Up @@ -381,7 +382,7 @@ func (gr *GolangRuntime) GetMatchingPipelines(incomingTopic string) []*interface
}

for _, pipeline := range gr.pipelines {
if topicMatches(incomingTopic, pipeline.Topic) {
if topicMatches(incomingTopic, pipeline.Topics) {
matches = append(matches, pipeline)
}
}
Expand All @@ -393,32 +394,39 @@ func (gr *GolangRuntime) GetPipelineById(id string) *interfaces.FunctionPipeline
return gr.pipelines[id]
}

func topicMatches(incomingTopic string, pipelineTopic string) bool {
if pipelineTopic == TopicWildCard {
return true
}
func topicMatches(incomingTopic string, pipelineTopics []string) bool {
for _, pipelineTopic := range pipelineTopics {
if pipelineTopic == TopicWildCard {
return true
}

wildcardCount := strings.Count(pipelineTopic, TopicWildCard)
switch wildcardCount {
case 0:
return incomingTopic == pipelineTopic
default:
pipelineLevels := strings.Split(pipelineTopic, TopicLevelSeparator)
incomingLevels := strings.Split(incomingTopic, TopicLevelSeparator)
wildcardCount := strings.Count(pipelineTopic, TopicWildCard)
switch wildcardCount {
case 0:
if incomingTopic == pipelineTopic {
return true
}
default:
pipelineLevels := strings.Split(pipelineTopic, TopicLevelSeparator)
incomingLevels := strings.Split(incomingTopic, TopicLevelSeparator)

if len(pipelineLevels) > len(incomingLevels) {
return false
}
if len(pipelineLevels) > len(incomingLevels) {
continue
}

for index, level := range pipelineLevels {
if level == TopicWildCard {
incomingLevels[index] = TopicWildCard
for index, level := range pipelineLevels {
if level == TopicWildCard {
incomingLevels[index] = TopicWildCard
}
}
}

incomingWithWildCards := strings.Join(incomingLevels, "/")
return strings.Index(incomingWithWildCards, pipelineTopic) == 0
incomingWithWildCards := strings.Join(incomingLevels, "/")
if strings.Index(incomingWithWildCards, pipelineTopic) == 0 {
return true
}
}
}
return false
}

func calculatePipelineHash(transforms []interfaces.AppFunction) string {
Expand Down
Loading