Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(sdk)!: remove deprecated Process code #1240

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions internal/app/triggerfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,10 @@ func (svc *Service) RegisterCustomTriggerFactory(name string,
messageProcessor := NewTriggerMessageProcessor(serviceBinding, svc.MetricsManager())

cfg := interfaces.TriggerConfig{
Logger: sdk.lc,
ContextBuilder: serviceBinding.BuildContext,
MessageProcessor: messageProcessor.Process,
MessageReceived: messageProcessor.MessageReceived,
ConfigLoader: serviceBinding.LoadCustomConfig,
Logger: sdk.lc,
ContextBuilder: serviceBinding.BuildContext,
MessageReceived: messageProcessor.MessageReceived,
ConfigLoader: serviceBinding.LoadCustomConfig,
}

return factory(cfg)
Expand Down
32 changes: 0 additions & 32 deletions internal/app/triggermessageprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,38 +99,6 @@ func NewTriggerMessageProcessor(bnd trigger.ServiceBinding, metricsManager boots
return mp
}

// Process provides runtime orchestration to pass the envelope / context to the pipeline.
// Deprecated: This does NOT support multi-pipeline usage. Will send a message to the default pipeline ONLY and throw if not configured. Use MessageReceived.
func (mp *triggerMessageProcessor) Process(ctx interfaces.AppFunctionContext, envelope types.MessageEnvelope) error {
mp.messagesReceived.Inc(1)
context, ok := ctx.(*appfunction.Context)
if !ok {
return fmt.Errorf("App Context must be an instance of internal appfunction.Context. Use NewAppContext to create instance.")
}

defaultPipelines := mp.serviceBinding.GetMatchingPipelines(interfaces.DefaultPipelineId)

if len(defaultPipelines) != 1 {
return fmt.Errorf("TriggerMessageProcessor is deprecated and does not support non-default or multiple pipelines. Please use TriggerMessageHandler.")
}

targetData, err, isInvalidMessage := mp.serviceBinding.DecodeMessage(context, envelope)
if err != nil {
if isInvalidMessage {
mp.invalidMessagesReceived.Inc(1)
}
return fmt.Errorf("unable to decode message: %s", err.Err.Error())
}

messageError := mp.serviceBinding.ProcessMessage(context, targetData, defaultPipelines[0])
if messageError != nil {
// ProcessMessage logs the error, so no need to log it here.
return messageError.Err
}

return nil
}

// MessageReceived provides runtime orchestration to pass the envelope / context to configured pipeline(s) along with a response callback to execute on each completion.
func (mp *triggerMessageProcessor) MessageReceived(ctx interfaces.AppFunctionContext, envelope types.MessageEnvelope, responseHandler interfaces.PipelineResponseHandler) error {
mp.messagesReceived.Inc(1)
Expand Down
3 changes: 0 additions & 3 deletions internal/trigger/messageprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ import (
)

type MessageProcessor interface {
// Process provides runtime orchestration to pass the envelope / context to the pipeline.
// Deprecated: will throw if multiple pipelines are configured for a message. Use MessageReceived.
Process(ctx interfaces.AppFunctionContext, envelope types.MessageEnvelope) error
// MessageReceived provides runtime orchestration to pass the envelope / context to configured pipeline(s)
MessageReceived(ctx interfaces.AppFunctionContext, envelope types.MessageEnvelope, outputHandler interfaces.PipelineResponseHandler) error
// ReceivedInvalidMessage is called when an invalid message is received so the metrics counter can be incremented.
Expand Down
3 changes: 0 additions & 3 deletions pkg/interfaces/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ type TriggerConfig struct {
Logger logger.LoggingClient
// ContextBuilder contructs a context the trigger can specify for processing the received message
ContextBuilder TriggerContextBuilder
// MessageProcessor processes a message on the services default pipeline
// Deprecated: use MessageReceived for multi-pipeline support
MessageProcessor TriggerMessageProcessor
// MessageReceived sends a message to the runtime for processing.
MessageReceived TriggerMessageHandler
// ConfigLoader is a function of type TriggerConfigLoader that can be used to load custom configuration sections for the trigger.s
Expand Down