Skip to content

Commit

Permalink
fix(triggers): MQTT subscribe via onConnect handler so re-subscribe o…
Browse files Browse the repository at this point in the history
…n reconnects (#537)

closes #531

Signed-off-by: lenny <[email protected]>
  • Loading branch information
lenny-goodell authored Oct 14, 2020
1 parent 7fa6067 commit c8e7ff0
Showing 1 changed file with 18 additions and 7 deletions.
25 changes: 18 additions & 7 deletions internal/trigger/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (trigger *Trigger) Initialize(_ *sync.WaitGroup, _ context.Context, backgro

opts := pahoMqtt.NewClientOptions()
opts.AutoReconnect = brokerConfig.AutoReconnect
opts.OnConnect = trigger.onConnectHandler
opts.ClientID = brokerConfig.ClientId
if len(brokerConfig.ConnectTimeout) > 0 {
duration, err := time.ParseDuration(brokerConfig.ConnectTimeout)
Expand Down Expand Up @@ -112,14 +113,8 @@ func (trigger *Trigger) Initialize(_ *sync.WaitGroup, _ context.Context, backgro
if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
return nil, fmt.Errorf("could not connect to broker for MQTT trigger: %s", token.Error().Error())
}
logger.Info("Connected to mqtt server for MQTT trigger")

if token := mqttClient.Subscribe(topic, brokerConfig.QoS, trigger.messageHandler); token.Wait() && token.Error() != nil {
mqttClient.Disconnect(0)
return nil, fmt.Errorf("could not subscribe to topic '%s' for MQTT trigger: %s", topic, token.Error().Error())
}

logger.Info(fmt.Sprintf("Subscribed to topic '%s' for MQTT trigger", topic))
logger.Info("Connected to mqtt server for MQTT trigger")

deferred := func() {
logger.Info("Disconnecting from broker for MQTT trigger")
Expand All @@ -131,6 +126,22 @@ func (trigger *Trigger) Initialize(_ *sync.WaitGroup, _ context.Context, backgro
return deferred, nil
}

func (trigger *Trigger) onConnectHandler(mqttClient pahoMqtt.Client) {
// Convenience short cuts
logger := trigger.edgeXClients.LoggingClient
topic := trigger.configuration.Binding.SubscribeTopic
qos := trigger.configuration.MqttBroker.QoS

if token := mqttClient.Subscribe(topic, qos, trigger.messageHandler); token.Wait() && token.Error() != nil {
mqttClient.Disconnect(0)
logger.Error(fmt.Sprintf("could not subscribe to topic '%s' for MQTT trigger: %s",
topic, token.Error().Error()))
return
}

logger.Info(fmt.Sprintf("Subscribed to topic '%s' for MQTT trigger", topic))
}

func (trigger *Trigger) messageHandler(client pahoMqtt.Client, message pahoMqtt.Message) {
// Convenience short cuts
logger := trigger.edgeXClients.LoggingClient
Expand Down

0 comments on commit c8e7ff0

Please sign in to comment.