Skip to content
This repository has been archived by the owner on Jan 21, 2020. It is now read-only.

fix topic subscribing bug #426

Merged
merged 5 commits into from
Mar 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 80 additions & 1 deletion pkg/broker/client/sse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ func TestBrokerMultiSubscriberCustomObject(t *testing.T) {
go func() {
for {
<-time.After(10 * time.Millisecond)

now := time.Now()
evt := event{Time: now.UnixNano(), Message: fmt.Sprintf("Now is %v", now)}
require.NoError(t, broker.Publish("remote/instance1", evt))
Expand All @@ -214,6 +213,86 @@ func TestBrokerMultiSubscriberCustomObject(t *testing.T) {

}

func TestBrokerMultiSubscriberPartialMatchTopic(t *testing.T) {
type event struct {
Time int64
Message string
}

socketFile := tempSocket()
socket := "unix://broker" + socketFile

broker, err := server.ListenAndServeOnSocket(socketFile)
require.NoError(t, err)

received1 := make(chan event)
received2 := make(chan event)

opts := Options{SocketDir: filepath.Dir(socketFile)}

topic1, errs1, err := Subscribe(socket, "local/instance", opts)
require.NoError(t, err)
go func() {
for {
select {
case e := <-errs1:
panic(e)
case m, ok := <-topic1:
if ok {
var val event
require.NoError(t, m.Decode(&val))
received1 <- val
} else {
close(received1)
}
}
}
}()

topic2, errs2, err := Subscribe(socket, "local/instancetest", opts)
require.NoError(t, err)
go func() {
for {
select {
case e := <-errs2:
panic(e)
case m, ok := <-topic2:
if ok {
var val event
require.NoError(t, m.Decode(&val))
received2 <- val
} else {
close(received2)
}
}
}
}()

go func() {
for {
<-time.After(10 * time.Millisecond)
now := time.Now()
evt := event{Time: now.UnixNano(), Message: fmt.Sprintf("Now is %v", now)}
require.NoError(t, broker.Publish("local/instance", evt))
evt = event{Time: now.Add(1 * time.Minute).UnixNano(), Message: fmt.Sprintf("Now is %v", now.Add(1*time.Minute))}
require.NoError(t, broker.Publish("local/instancetest", evt))
}
}()

// Test a few rounds to make sure all subscribers get the same messages each round.
for i := 0; i < 5; i++ {
fmt.Print("runtestcycle")
b := <-received2
a := <-received1
require.NotNil(t, a)
require.NotEqual(t, "", a.Message)
require.NotEqual(t, a, b)
}

broker.Stop()

}

// This tests the case where the broker is mapped to url route (e.g. /events)
// In this case, we need to specify the path option in the connection options so that
// we can properly connect to the broker at the url prefix.
Expand Down
18 changes: 18 additions & 0 deletions pkg/broker/server/sse.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"net/http"
"strings"
"time"

log "github.com/Sirupsen/logrus"
Expand Down Expand Up @@ -159,6 +160,19 @@ func (b *Broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
}
}

//checkPath Compare the subscribed topic with the published topic to see if the message can be sent to the client.
func (b *Broker) checkPath(subscribedPath string, publishedPath string) bool {
if subscribedPath != "/" {
pPath := strings.Split(publishedPath, "/")
for i, sliceTopic := range strings.Split(subscribedPath, "/") {
if pPath[i] != sliceTopic {
return false
}
}
}
return true
}

func (b *Broker) run() {
for {
select {
Expand Down Expand Up @@ -244,6 +258,10 @@ func (b *Broker) run() {

panic("assert-failed")
}
// Make sure that the topic subscribed to by the client is the upper topic of the topic being notified or the topic exactly matched.
if !b.checkPath(key, event.topic) {
return false
}

for ch := range chset {
select {
Expand Down