Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unsubscribe surprisingly breaks previous subscribe #336

Closed
rollulus opened this issue Jul 15, 2019 · 3 comments
Closed

Unsubscribe surprisingly breaks previous subscribe #336

rollulus opened this issue Jul 15, 2019 · 3 comments

Comments

@rollulus
Copy link

rollulus commented Jul 15, 2019

Subscribing to /a/# works correctly.
However, if one subscribes to /a/b/# and unsubscribes from it afterwards, the earlier message handler doesn't receive anything anymore, although the broker publishes according to tcpdump.

To reproduce:

package main

import (
	"log"
	"time"

	mqtt "github.com/eclipse/paho.mqtt.golang"
)

func msgHandler(_ mqtt.Client, m mqtt.Message) {
	log.Printf("received message for topic %s: %s", m.Topic(), string(m.Payload()))
}

func main() {
	o := mqtt.NewClientOptions().AddBroker("tcp://" + "iot.eclipse.org:1883")
	c := mqtt.NewClient(o)
	if t := c.Connect(); t.Wait() && t.Error() != nil {
		panic(t.Error())
	}

	if t := c.Subscribe("/a/#", 1, msgHandler); t.Wait() && t.Error() != nil {
		panic(t.Error())
	}

	if t := c.Subscribe("/a/b/#", 1, msgHandler); t.Wait() && t.Error() != nil {
		panic(t.Error())
	}

	if t := c.Unsubscribe("/a/b/#"); t.Wait() && t.Error() != nil {
		panic(t.Error())
	}

	time.Sleep(time.Hour)
}

And publish messages from the console with:

mosquitto_pub -h iot.eclipse.org -t '/a/b/hello'   -d -m 'xxxx' -q 1
mosquitto_pub -h iot.eclipse.org -t '/a/hello'   -d -m 'xxxx' -q 1

My expectation is to receive those messages. In practice, nothing happens.

To verify that the example isn't broken itself, remove the last Subscribe and Unsubscribe.

@boki
Copy link

boki commented Oct 15, 2019

I think this is the same issue I ran into: router.addRoute uses route.match to update existing routes, resulting in wildcards being applied. The fix seems to be to change line 102 in route.go from

--- a/router.go
+++ b/router.go
@@ -105,7 +105,7 @@ func (r *router) addRoute(topic string, callback MessageHandler) {
        r.Lock()
        defer r.Unlock()
        for e := r.routes.Front(); e != nil; e = e.Next() {
-               if e.Value.(*route).match(topic) {
+               if e.Value.(*route).topic == topic {
                        r := e.Value.(*route)
                        r.callback = callback
                        return

@v-zhuravlev
Copy link
Contributor

v-zhuravlev commented Dec 5, 2019

Had exactly the same issue with router.addRoute.
In my case, I have the client with subscriptions to #, topic1, topic2 with different callbacks set for each. Due to if e.Value.(*route).match(topic) {, new subscription overrides previous routes instead of creating new one and only last callback is working for all routes.

And in my app I expect:
If I publish something to topic1, I expect both callbacks for # and topic1 subscriptions to fire. Not just the last one.

@MattBrittan
Copy link
Contributor

Closing this because a fix was committed a year ago; please feel free to reopen if its still an issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants