Skip to content

Commit

Permalink
feat(triggers): Add MQTT Trigger with secure connection options
Browse files Browse the repository at this point in the history
close #454

Signed-off-by: lenny <[email protected]>
  • Loading branch information
lenny committed Sep 22, 2020
1 parent a28a1e2 commit 5885ac0
Show file tree
Hide file tree
Showing 8 changed files with 757 additions and 440 deletions.
18 changes: 16 additions & 2 deletions appsdk/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/edgexfoundry/app-functions-sdk-go/internal/trigger"
"github.com/edgexfoundry/app-functions-sdk-go/internal/trigger/http"
"github.com/edgexfoundry/app-functions-sdk-go/internal/trigger/messagebus"
"github.com/edgexfoundry/app-functions-sdk-go/internal/trigger/mqtt"
"github.com/edgexfoundry/app-functions-sdk-go/internal/webserver"
"github.com/edgexfoundry/app-functions-sdk-go/pkg/util"
)
Expand Down Expand Up @@ -149,8 +150,12 @@ func (sdk *AppFunctionsSDK) MakeItRun() error {

sdk.runtime.Initialize(sdk.storeClient, sdk.secretProvider)
sdk.runtime.SetTransforms(sdk.transforms)

// determine input type and create trigger for it
t := sdk.setupTrigger(sdk.config, sdk.runtime)
if t == nil {
return errors.New("Failed to create Trigger")
}

// Initialize the trigger (i.e. start a web server, or connect to message bus)
deferred, err := t.Initialize(sdk.appWg, sdk.appCtx, sdk.backgroundChannel)
Expand Down Expand Up @@ -435,9 +440,18 @@ func (sdk *AppFunctionsSDK) setupTrigger(configuration *common.ConfigurationStru
case "HTTP":
sdk.LoggingClient.Info("HTTP trigger selected")
t = &http.Trigger{Configuration: configuration, Runtime: runtime, Webserver: sdk.webserver, EdgeXClients: sdk.edgexClients}
case "MESSAGEBUS":
sdk.LoggingClient.Info("MessageBus trigger selected")

case "MESSAGEBUS",
"EDGEX-MESSAGEBUS": // Allows for more explicit name now that we have plain MQTT option also
sdk.LoggingClient.Info("EdgeX MessageBus trigger selected")
t = &messagebus.Trigger{Configuration: configuration, Runtime: runtime, EdgeXClients: sdk.edgexClients}

case "EXTERNAL-MQTT":
sdk.LoggingClient.Info("External MQTT trigger selected")
t = mqtt.NewTrigger(configuration, runtime, sdk.edgexClients, sdk.secretProvider)

default:
sdk.LoggingClient.Error(fmt.Sprintf("Invalid Trigger type of '%s' specified", configuration.Binding.Type))
}

return t
Expand Down
29 changes: 28 additions & 1 deletion internal/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type ConfigurationStruct struct {
Service ServiceInfo
// MessageBus
MessageBus types.MessageBusConfig
// MqttBroker
MqttBroker MqttBrokerConfig
// Binding
Binding BindingInfo
// ApplicationSettings
Expand Down Expand Up @@ -88,12 +90,37 @@ type BindingInfo struct {
//
// example: messagebus
// required: true
// enum: messagebus,http
// enum: messagebus (edgex-messagebus), http, external-mqtt
Type string
SubscribeTopic string
PublishTopic string
}

// MqttBrokerConfig contains the MQTT broker configuration for MQTT Trigger
type MqttBrokerConfig struct {
// Url contains the fully qualified URL to connect to the MQTT broker
Url string
// ClientId to connect to the broker with.
ClientId string
// ConnectTimeout is a time duration indicating how long to wait timing out on the broker connection
ConnectTimeout string
// AutoReconnect indicated whether or not to retry connection if disconnected
AutoReconnect bool
// KeepAlive is seconds between client ping when no active data flowing to avoid client being disconnected
KeepAlive int64
// QoS for MQTT Connection
QoS byte
// Retain setting for MQTT Connection
Retain bool
// SkipCertVerify indicates if the certificate verification should be skipped
SkipCertVerify bool
// SecretPath is the name of the path in secret provider to retrieve your secrets
SecretPath string
// AuthMode indicates what to use when connecting to the broker. Options are "none", "cacert" , "usernamepassword", "clientcert".
// If a CA Cert exists in the SecretPath then it will be used for all modes except "none".
AuthMode string
}

type PipelineInfo struct {
ExecutionOrder string
UseTargetTypeOfByteArray bool
Expand Down
183 changes: 183 additions & 0 deletions internal/trigger/mqtt/mqtt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
//
// Copyright (c) 2020 Intel Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package mqtt

import (
"context"
"errors"
"fmt"
"net/url"
"sync"
"time"

pahoMqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/edgexfoundry/go-mod-bootstrap/bootstrap"
"github.com/edgexfoundry/go-mod-core-contracts/clients"
"github.com/edgexfoundry/go-mod-messaging/pkg/types"
"github.com/google/uuid"

"github.com/edgexfoundry/app-functions-sdk-go/appcontext"
"github.com/edgexfoundry/app-functions-sdk-go/internal/common"
"github.com/edgexfoundry/app-functions-sdk-go/internal/runtime"
"github.com/edgexfoundry/app-functions-sdk-go/internal/security"
"github.com/edgexfoundry/app-functions-sdk-go/pkg/secure"
)

// Trigger implements Trigger to support Triggers
type Trigger struct {
configuration *common.ConfigurationStruct
mqttClient pahoMqtt.Client
runtime *runtime.GolangRuntime
edgeXClients common.EdgeXClients
secretProvider security.SecretProvider
}

func NewTrigger(
configuration *common.ConfigurationStruct,
runtime *runtime.GolangRuntime,
clients common.EdgeXClients,
secretProvider security.SecretProvider) *Trigger {
return &Trigger{
configuration: configuration,
runtime: runtime,
edgeXClients: clients,
secretProvider: secretProvider,
}
}

// Initialize initializes the Trigger for an external MQTT broker
func (trigger *Trigger) Initialize(_ *sync.WaitGroup, _ context.Context, background <-chan types.MessageEnvelope) (bootstrap.Deferred, error) {
// Convenience short cuts
logger := trigger.edgeXClients.LoggingClient
brokerConfig := trigger.configuration.MqttBroker
topic := trigger.configuration.Binding.SubscribeTopic

logger.Info("Initializing MQTT Trigger")

if background != nil {
return nil, errors.New("background publishing not supported for services using MQTT trigger")
}

if len(topic) == 0 {
return nil, fmt.Errorf("missing SubscribeTopic for MQTT Trigger. Must be present in [Binding] section.")
}

brokerUrl, err := url.Parse(brokerConfig.Url)
if err != nil {
return nil, fmt.Errorf("invalid MQTT Broker Url '%s': %s", trigger.configuration.MqttBroker.Url, err.Error())
}

opts := pahoMqtt.NewClientOptions()
opts.AutoReconnect = brokerConfig.AutoReconnect
opts.ClientID = brokerConfig.ClientId
if len(brokerConfig.ConnectTimeout) > 0 {
duration, err := time.ParseDuration(brokerConfig.ConnectTimeout)
if err != nil {
return nil, fmt.Errorf("invalid MQTT ConnectTimeout '%s': %s", brokerConfig.ConnectTimeout, err.Error())
}
opts.ConnectTimeout = duration
}
opts.KeepAlive = brokerConfig.KeepAlive
opts.Servers = []*url.URL{brokerUrl}

mqttFactory := secure.NewMqttFactory(
logger,
trigger.secretProvider,
brokerConfig.AuthMode,
brokerConfig.SecretPath,
brokerConfig.SkipCertVerify,
)

mqttClient, err := mqttFactory.Create(opts)
if err != nil {
return nil, fmt.Errorf("unable to create secure MQTT Client: %s", err.Error())
}

logger.Info(fmt.Sprintf("Connecting to mqtt broker for MQTT trigger at: %s", brokerUrl))

if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
return nil, fmt.Errorf("could not connect to broker for MQTT trigger: %s", token.Error().Error())
}
logger.Info("Connected to mqtt server for MQTT trigger")

if token := mqttClient.Subscribe(topic, brokerConfig.QoS, trigger.messageHandler); token.Wait() && token.Error() != nil {
mqttClient.Disconnect(0)
return nil, fmt.Errorf("could not subscribe to topic '%s' for MQTT trigger: %s", topic, token.Error().Error())
}

logger.Info(fmt.Sprintf("Subscribed to topic '%s' for MQTT trigger", topic))

deferred := func() {
logger.Info("Disconnecting from broker for MQTT trigger")
trigger.mqttClient.Disconnect(0)
}

trigger.mqttClient = mqttClient

return deferred, nil
}

func (trigger *Trigger) messageHandler(client pahoMqtt.Client, message pahoMqtt.Message) {
// Convenience short cuts
logger := trigger.edgeXClients.LoggingClient
brokerConfig := trigger.configuration.MqttBroker
topic := trigger.configuration.Binding.PublishTopic

data := message.Payload()
contentType := clients.ContentTypeJSON
if data[0] != byte('{') {
// If not JSON then assume it is CBOR
contentType = clients.ContentTypeCBOR
}

correlationID := uuid.New().String()

edgexContext := &appcontext.Context{
CorrelationID: correlationID,
Configuration: trigger.configuration,
LoggingClient: trigger.edgeXClients.LoggingClient,
EventClient: trigger.edgeXClients.EventClient,
ValueDescriptorClient: trigger.edgeXClients.ValueDescriptorClient,
CommandClient: trigger.edgeXClients.CommandClient,
NotificationsClient: trigger.edgeXClients.NotificationsClient,
}

logger.Trace("Received message from MQTT Trigger", clients.CorrelationHeader, correlationID)
logger.Debug(fmt.Sprintf("Received message from MQTT Trigger with %d bytes", len(data)), clients.ContentType, contentType)

envelope := types.MessageEnvelope{
CorrelationID: correlationID,
ContentType: contentType,
Payload: data,
}

messageError := trigger.runtime.ProcessMessage(edgexContext, envelope)
if messageError != nil {
// ProcessMessage logs the error, so no need to log it here.
// ToDo: Do we want to publish the error back to the Broker?
return
}

if len(edgexContext.OutputData) > 0 && len(topic) > 0 {
if token := client.Publish(topic, brokerConfig.QoS, brokerConfig.Retain, edgexContext.OutputData); token.Wait() && token.Error() != nil {
logger.Error("could not publish to topic '%s' for MQTT trigger: %s", topic, token.Error().Error())
} else {
logger.Trace("Sent MQTT Trigger response message", clients.CorrelationHeader, correlationID)
logger.Debug(fmt.Sprintf("Sent MQTT Trigger response message on topic '%s' with %d bytes", topic, len(edgexContext.OutputData)))
}
}
}
2 changes: 1 addition & 1 deletion internal/webserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestConfigureAndConfigRoute(t *testing.T) {
rr := httptest.NewRecorder()
webserver.router.ServeHTTP(rr, req)

expected := `{"Writable":{"LogLevel":"","Pipeline":{"ExecutionOrder":"","UseTargetTypeOfByteArray":false,"Functions":null},"StoreAndForward":{"Enabled":false,"RetryInterval":"","MaxRetryCount":0},"InsecureSecrets":null},"Logging":{"EnableRemote":false,"File":""},"Registry":{"Host":"","Port":0,"Type":""},"Service":{"BootTimeout":"","CheckInterval":"","Host":"","HTTPSCert":"","HTTPSKey":"","ServerBindAddr":"","Port":0,"Protocol":"","StartupMsg":"","ReadMaxLimit":0,"Timeout":""},"MessageBus":{"PublishHost":{"Host":"","Port":0,"Protocol":""},"SubscribeHost":{"Host":"","Port":0,"Protocol":""},"Type":"","Optional":null},"Binding":{"Type":"","SubscribeTopic":"","PublishTopic":""},"ApplicationSettings":null,"Clients":null,"Database":{"Type":"","Host":"","Port":0,"Timeout":"","Username":"","Password":"","MaxIdle":0,"BatchSize":0},"SecretStore":{"Host":"","Port":0,"Path":"","Protocol":"","Namespace":"","RootCaCertPath":"","ServerName":"","Authentication":{"AuthType":"","AuthToken":""},"AdditionalRetryAttempts":0,"RetryWaitPeriod":"","TokenFile":""},"SecretStoreExclusive":{"Host":"","Port":0,"Path":"","Protocol":"","Namespace":"","RootCaCertPath":"","ServerName":"","Authentication":{"AuthType":"","AuthToken":""},"AdditionalRetryAttempts":0,"RetryWaitPeriod":"","TokenFile":""}}` + "\n"
expected := `{"Writable":{"LogLevel":"","Pipeline":{"ExecutionOrder":"","UseTargetTypeOfByteArray":false,"Functions":null},"StoreAndForward":{"Enabled":false,"RetryInterval":"","MaxRetryCount":0},"InsecureSecrets":null},"Logging":{"EnableRemote":false,"File":""},"Registry":{"Host":"","Port":0,"Type":""},"Service":{"BootTimeout":"","CheckInterval":"","Host":"","HTTPSCert":"","HTTPSKey":"","ServerBindAddr":"","Port":0,"Protocol":"","StartupMsg":"","ReadMaxLimit":0,"Timeout":""},"MessageBus":{"PublishHost":{"Host":"","Port":0,"Protocol":""},"SubscribeHost":{"Host":"","Port":0,"Protocol":""},"Type":"","Optional":null},"MqttBroker":{"Url":"","ClientId":"","ConnectTimeout":"","AutoReconnect":false,"KeepAlive":0,"QoS":0,"Retain":false,"SkipCertVerify":false,"SecretPath":"","AuthMode":""},"Binding":{"Type":"","SubscribeTopic":"","PublishTopic":""},"ApplicationSettings":null,"Clients":null,"Database":{"Type":"","Host":"","Port":0,"Timeout":"","Username":"","Password":"","MaxIdle":0,"BatchSize":0},"SecretStore":{"Host":"","Port":0,"Path":"","Protocol":"","Namespace":"","RootCaCertPath":"","ServerName":"","Authentication":{"AuthType":"","AuthToken":""},"AdditionalRetryAttempts":0,"RetryWaitPeriod":"","TokenFile":""},"SecretStoreExclusive":{"Host":"","Port":0,"Path":"","Protocol":"","Namespace":"","RootCaCertPath":"","ServerName":"","Authentication":{"AuthType":"","AuthToken":""},"AdditionalRetryAttempts":0,"RetryWaitPeriod":"","TokenFile":""}}` + "\n"

body := rr.Body.String()
assert.Equal(t, expected, body)
Expand Down
Loading

0 comments on commit 5885ac0

Please sign in to comment.