-
Notifications
You must be signed in to change notification settings - Fork 84
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sdk): Support Multi-Topic Pipeline Configuration #947
Conversation
Codecov Report
@@ Coverage Diff @@
## main #947 +/- ##
==========================================
+ Coverage 65.63% 65.71% +0.08%
==========================================
Files 33 33
Lines 2514 2520 +6
==========================================
+ Hits 1650 1656 +6
Misses 764 764
Partials 100 100
Continue to review full report at Codecov.
|
app-service-template/main.go
Outdated
@@ -100,7 +100,7 @@ func (app *myApp) CreateAndRunAppService(serviceKey string, newServiceFactory fu | |||
sample := functions.NewSample() | |||
|
|||
// TODO: Replace below functions with built in and/or your custom functions for your use case | |||
// or remove is using Pipeline By Topic below. | |||
// or remove is using Pipeline By Topics below. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// or remove is using Pipeline By Topics below. | |
// or remove if using Pipeline By Topics below. |
app-service-template/main.go
Outdated
@@ -120,8 +120,8 @@ func (app *myApp) CreateAndRunAppService(serviceKey string, newServiceFactory fu | |||
// Core Data publishes to the 'edgex/events/core/<profile-name>/<device-name>/<source-name>' topic | |||
// Note: This example with default above causes Events from Random-Float-Device device to be processed twice | |||
// resulting in the XML to be published back to the MessageBus twice. | |||
// See <Pipeline By Topic documentation url TBD> for more details. | |||
err = app.service.AddFunctionsPipelineForTopic("Floats", "edgex/events/#/#/Random-Float-Device/#", | |||
// See <Pipeline By Topics documentation url TBD> for more details. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// See <Pipeline By Topics documentation url TBD> for more details. | |
// See https://docs.edgexfoundry.org/2.0/microservices/application/AdvancedTopics/#pipeline-per-topics for more details. |
internal/app/service.go
Outdated
if len(strings.TrimSpace(topic)) == 0 { | ||
return errors.New("topic for pipeline can not be blank") | ||
if len(topics) == 0 { | ||
return errors.New("topics for pipeline can not be blank") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return errors.New("topics for pipeline can not be blank") | |
return errors.New("topics for pipeline can not be empty") |
internal/app/service_test.go
Outdated
{"Duplicate Id", "123", TriggerTypeMessageBus, "#", transforms, true}, | ||
{"HTTP Trigger Type", "127", TriggerTypeHTTP, "#", transforms, true}, | ||
{"Happy Path", "123", TriggerTypeMessageBus, defaultTopics, transforms, false}, | ||
{"Duplicate Id", "123", TriggerTypeMessageBus, defaultTopics, transforms, true}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test depends on pervious test to have run. Correct? This is not good practice as I can't run this test by itself. This is way I used service.SetDefaultFunctionsPipeline
above. Does look like I had something else messaged up... ;-) "Default Id", interfaces.DefaultPipelineId,
should have been the "Duplicate Id"
internal/common/config.go
Outdated
@@ -162,12 +162,12 @@ type PipelineInfo struct { | |||
Functions map[string]PipelineFunction | |||
} | |||
|
|||
// TopicPipeline define the data to a Pre Topic functions pipeline | |||
// TopicPipeline define the data to a Pre Topics functions pipeline |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// TopicPipeline define the data to a Pre Topics functions pipeline | |
// TopicPipeline define the data to a Per Topics functions pipeline |
internal/runtime/runtime.go
Outdated
wildcardCount := strings.Count(pipelineTopic, TopicWildCard) | ||
switch wildcardCount { | ||
case 0: | ||
return incomingTopic == pipelineTopic |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return incomingTopic == pipelineTopic | |
if incomingTopic == pipelineTopic { | |
return true | |
} |
internal/runtime/runtime.go
Outdated
return false | ||
} | ||
if len(pipelineLevels) > len(incomingLevels) { | ||
return false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return false | |
continue |
internal/runtime/runtime_test.go
Outdated
@@ -566,7 +566,7 @@ func TestTopicMatches(t *testing.T) { | |||
|
|||
for _, test := range tests { | |||
t.Run(test.name, func(t *testing.T) { | |||
actual := topicMatches(test.incomingTopic, test.pipelineTopic) | |||
actual := topicMatches(test.incomingTopic, []string{test.pipelineTopic}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Expand the tests struct to have multiple topics and add cases for:
- first topic has no wild card and doesn't match exactly and second topic does match in some way
- first topic has levels greater than that of incoming and second topic does match in some way
Also looks like I was missing a test for a single pipeline topic that has levels greater than that of incoming
pkg/interfaces/service.go
Outdated
@@ -88,11 +88,11 @@ type ApplicationService interface { | |||
// Note that the functions are executed in the order provided in the list. | |||
// An error is returned if the list is empty. | |||
SetDefaultFunctionsPipeline(transforms ...AppFunction) error | |||
// AddFunctionsPipelineForTopic adds a functions pipeline with the specified unique id and list of Application Functions | |||
// AddFunctionsPipelineForTopics adds a functions pipeline with the specified unique id and list of Application Functions | |||
// to be executed when the incoming topic matches the specified topic. The specified topic may contain the '#' wildcard |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// to be executed when the incoming topic matches the specified topic. The specified topic may contain the '#' wildcard | |
// to be executed when the incoming topic matches any of the specified topics. The specified topic may contain the '#' wildcard |
internal/common/config.go
Outdated
type TopicPipeline struct { | ||
// Id is the unique ID of the pipeline instance | ||
Id string | ||
// Topic is the topic which must match the incoming topic inorder for the pipeline to execute | ||
Topic string | ||
// Topics is the set of topics matched against the incoming to determine if pipeline should execute |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Topics is the set of topics matched against the incoming to determine if pipeline should execute | |
// Topics is the set of comma separated topics matched against the incoming to determine if pipeline should execute |
Match incoming topic against an array of topics when determining whether or not to execute a given pipeline. Allow Topics configuration as comma separated list similar to messaging triggers. Also allow configuration of non-default pipeline(s) for HTTP trigger. Signed-off-by: Alex Ullrich <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Match incoming topic against an array of topics when determining whether or not to execute a given pipeline. Allow Topics configuration as comma separated list similar to messaging triggers. Also allow configuration of non-default pipeline(s) for HTTP trigger. Signed-off-by: Alex Ullrich <[email protected]>
Match incoming topic against an array of topics when determining whether
or not to execute a given pipeline. Allow Topics configuration as comma
separated list similar to messaging triggers.
PR Checklist
Please check if your PR fulfills the following requirements:
Related Docs PR now required (if applicable)
Related Docs PR:
edgexfoundry/edgex-docs#551
If n/a for Docs PR, state why it is not applicable:
What is the current behavior?
Pipelines can only be configured to run for a single topic.
Issue Number: #945
What is the new behavior?
Pipelines can be configured with a comma-separated list of topics similar to messaging triggers.
Does this PR introduce a breaking change?
Are there any new imports or modules? If so, what are they used for and why?
Are there any specific instructions or things that should be known prior to reviewing?
Other information