Skip to content

Commit

Permalink
feat(sdk): Custom Trigger Multi-Pipeline Support
Browse files Browse the repository at this point in the history
Add service.processMessageOnRuntime to use default pipeline if configured or attempt to use topic matching logic.  Deprecate TriggerMessageProcessor and
TriggerContextBuilder and inline the existing functions on TriggerConfig as the approach used there will not work with multiple pipelines. Replace with
MessageProcessor that only takes the message envelope and builds context(s) as needed.

Signed-off-by: Alex Ullrich <[email protected]>
  • Loading branch information
AlexCuse committed Sep 10, 2021
1 parent a082b2d commit 4d23791
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 3 deletions.
6 changes: 6 additions & 0 deletions app-service-template/Attribution.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,18 @@ https://github.com/gorilla/mux/blob/master/LICENSE
hashicorp/consul/api (Mozilla Public License 2.0) - https://github.com/hashicorp/consul/api
https://github.com/hashicorp/consul/blob/master/LICENSE

hashicorp/errwrap (Mozilla Public License 2.0) https://github.com/hashicorp/errwrap
https://github.com/hashicorp/errwrap/blob/master/LICENSE

hashicorp/go-cleanhttp (Mozilla Public License 2.0) - https://github.com/hashicorp/go-cleanhttp
https://github.com/hashicorp/go-cleanhttp/blob/master/LICENSE

hashicorp/go-immutable-radix (Mozilla Public License 2.0) https://github.com/hashicorp/go-immutable-radix
https://github.com/hashicorp/go-immutable-radix/blob/master/LICENSE

hashicorp/go-multierror (Mozilla Public License 2.0) https://github.com/hashicorp/go-multierror
https://github.com/hashicorp/go-multierror/blob/master/LICENSE

hashicorp/go-rootcerts (Mozilla Public License 2.0) https://github.com/hashicorp/go-rootcerts
https://github.com/hashicorp/go-rootcerts/blob/master/LICENSE

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ require (
github.com/gomodule/redigo v2.0.0+incompatible
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
github.com/hashicorp/go-multierror v1.1.0
github.com/stretchr/testify v1.7.0
)
59 changes: 59 additions & 0 deletions internal/app/triggerfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ package app
import (
"errors"
"fmt"
"github.com/hashicorp/go-multierror"
"strings"
"sync"

"github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"

Expand Down Expand Up @@ -61,13 +63,69 @@ func (svc *Service) RegisterCustomTriggerFactory(name string,
Logger: sdk.lc,
ContextBuilder: sdk.defaultTriggerContextBuilder,
MessageProcessor: sdk.defaultTriggerMessageProcessor,
ProcessMessage: sdk.processMessageOnRuntime,
ConfigLoader: sdk.defaultConfigLoader,
})
}

return nil
}

func (svc *Service) buildContextForRuntime(envelope types.MessageEnvelope) *appfunction.Context {
context := appfunction.NewContext(envelope.CorrelationID, svc.dic, envelope.ContentType)

if envelope.ReceivedTopic != "" {
context.AddValue(interfaces.RECEIVEDTOPIC, envelope.ReceivedTopic)
}

return context
}
func (svc *Service) processMessageOnRuntime(envelope types.MessageEnvelope) error {
context := svc.buildContextForRuntime(envelope)
defaultPipeline := svc.runtime.GetDefaultPipeline()

if defaultPipeline != nil { // then we aren't configured for topic matching, use default
context.LoggingClient().Debug("trigger using default pipeline")

messageError := svc.runtime.ProcessMessage(context, envelope, defaultPipeline)
if messageError != nil {
// ProcessMessage logs the error, so no need to log it here.
return messageError.Err
}
} else { // route to pipeline(s) via topic match
context.LoggingClient().Debug("trigger attempting to find pipeline(s) for topic %s", envelope.ReceivedTopic)

pipelines := svc.runtime.GetMatchingPipelines(envelope.ReceivedTopic)

context.LoggingClient().Debugf("trigger found %d pipeline(s) that match the incoming topic '%s'", len(pipelines), envelope.ReceivedTopic)

var finalErr error

pipelinesWaitGroup := sync.WaitGroup{}

for _, pipeline := range pipelines {
go func(p *interfaces.FunctionPipeline, e error, wg *sync.WaitGroup) {
wg.Add(1)
ctx := svc.buildContextForRuntime(envelope)
if msgErr := svc.runtime.ProcessMessage(ctx, envelope, p); msgErr != nil {
e = multierror.Append(e, msgErr.Err)
}

wg.Done()
}(pipeline, finalErr, &pipelinesWaitGroup)
}

pipelinesWaitGroup.Wait()

if finalErr != nil {
return finalErr
}
}

return nil
}

//Deprecated
func (svc *Service) defaultTriggerMessageProcessor(appContext interfaces.AppFunctionContext, envelope types.MessageEnvelope) error {
context, ok := appContext.(*appfunction.Context)
if !ok {
Expand All @@ -84,6 +142,7 @@ func (svc *Service) defaultTriggerMessageProcessor(appContext interfaces.AppFunc
return nil
}

// Deprecated
func (svc *Service) defaultTriggerContextBuilder(env types.MessageEnvelope) interfaces.AppFunctionContext {
return appfunction.NewContext(env.CorrelationID, svc.dic, env.ContentType)
}
Expand Down
20 changes: 17 additions & 3 deletions pkg/interfaces/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,18 @@ import (

// TriggerConfig provides a container to pass context needed for user defined triggers
type TriggerConfig struct {
Logger logger.LoggingClient
ContextBuilder TriggerContextBuilder
// Logger exposes the logging client passed from the service
Logger logger.LoggingClient
// ContextBuilder contructs a context the trigger can specify for processing the received message
// Deprecated: only needed when using MessageProcessor
ContextBuilder TriggerContextBuilder
// MessageProcessor processes a message on the services default pipeline
// Deprecated: use ProcessMessage so that custom triggers can feed data to services configured with multiple pipelines
MessageProcessor TriggerMessageProcessor
ConfigLoader TriggerConfigLoader
// ProcessMessage is a function of type MessageProcessor that is used to deliver messages to the runtime.
ProcessMessage MessageProcessor
// ConfigLoader is a function of type TriggerConfigLoader that can be used to load custom configuration sections for the trigger.s
ConfigLoader TriggerConfigLoader
}

// Trigger provides an abstract means to pass messages to the function pipeline
Expand All @@ -40,9 +48,15 @@ type Trigger interface {
}

// TriggerMessageProcessor provides an interface that can be used by custom triggers to invoke the runtime
// Deprecated: use MessageProcessor instead
type TriggerMessageProcessor func(ctx AppFunctionContext, envelope types.MessageEnvelope) error

// TriggerContextBuilder provides an interface to construct an AppFunctionContext for message
// Deprecated: only used with legacy TriggerMessageProcessor
type TriggerContextBuilder func(env types.MessageEnvelope) AppFunctionContext

// MessageProcessor provides an interface that can be used by custom triggers to invoke the runtime
type MessageProcessor func(envelope types.MessageEnvelope) error

// TriggerConfigLoader provides an interface that can be used by custom triggers to load custom configuration elements
type TriggerConfigLoader func(config UpdatableConfig, sectionName string) error

0 comments on commit 4d23791

Please sign in to comment.