From 10a6a46bfdf5a9524aa4c179de0cbac10dd70f4d Mon Sep 17 00:00:00 2001 From: Yuji Oshima Date: Tue, 7 Mar 2017 10:58:02 +0900 Subject: [PATCH 1/5] fix topic subscribing bug Signed-off-by: Yuji Oshima --- pkg/broker/server/sse.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pkg/broker/server/sse.go b/pkg/broker/server/sse.go index 298d4270b..3c06980cd 100644 --- a/pkg/broker/server/sse.go +++ b/pkg/broker/server/sse.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "net/http" + "strings" "time" log "github.com/Sirupsen/logrus" @@ -244,6 +245,13 @@ func (b *Broker) run() { panic("assert-failed") } + // Make sure that the topic subscribed to by the client is the upper topic of the topic being notified or the topic exactly matched. + notfyTopic := strings.Split(event.topic, "/") + for i, sliceTopic := range strings.Split(key, "/") { + if notfyTopic[i] != sliceTopic { + return false + } + } for ch := range chset { select { From 151bc9f68ebb41552651e111c0c05547d3f6f6ad Mon Sep 17 00:00:00 2001 From: Yuji Oshima Date: Tue, 7 Mar 2017 16:39:06 +0900 Subject: [PATCH 2/5] fix Signed-off-by: Yuji Oshima --- pkg/broker/server/sse.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pkg/broker/server/sse.go b/pkg/broker/server/sse.go index 3c06980cd..7b3a806d1 100644 --- a/pkg/broker/server/sse.go +++ b/pkg/broker/server/sse.go @@ -245,11 +245,13 @@ func (b *Broker) run() { panic("assert-failed") } - // Make sure that the topic subscribed to by the client is the upper topic of the topic being notified or the topic exactly matched. - notfyTopic := strings.Split(event.topic, "/") - for i, sliceTopic := range strings.Split(key, "/") { - if notfyTopic[i] != sliceTopic { - return false + if key != "/" { + // Make sure that the topic subscribed to by the client is the upper topic of the topic being notified or the topic exactly matched. + notfyTopic := strings.Split(event.topic, "/") + for i, sliceTopic := range strings.Split(key, "/") { + if notfyTopic[i] != sliceTopic { + return false + } } } From 668cf427905b6c634ded0bd88461bd6d76dbac53 Mon Sep 17 00:00:00 2001 From: Yuji Oshima Date: Tue, 7 Mar 2017 16:39:17 +0900 Subject: [PATCH 3/5] add test Signed-off-by: Yuji Oshima --- pkg/broker/client/sse_test.go | 81 ++++++++++++++++++++++++++++++++++- 1 file changed, 80 insertions(+), 1 deletion(-) diff --git a/pkg/broker/client/sse_test.go b/pkg/broker/client/sse_test.go index 0ee62c47c..b55c9abca 100644 --- a/pkg/broker/client/sse_test.go +++ b/pkg/broker/client/sse_test.go @@ -193,7 +193,6 @@ func TestBrokerMultiSubscriberCustomObject(t *testing.T) { go func() { for { <-time.After(10 * time.Millisecond) - now := time.Now() evt := event{Time: now.UnixNano(), Message: fmt.Sprintf("Now is %v", now)} require.NoError(t, broker.Publish("remote/instance1", evt)) @@ -214,6 +213,86 @@ func TestBrokerMultiSubscriberCustomObject(t *testing.T) { } +func TestBrokerMultiSubscriberPartialMatchTopic(t *testing.T) { + type event struct { + Time int64 + Message string + } + + socketFile := tempSocket() + socket := "unix://broker" + socketFile + + broker, err := server.ListenAndServeOnSocket(socketFile) + require.NoError(t, err) + + received1 := make(chan event) + received2 := make(chan event) + + opts := Options{SocketDir: filepath.Dir(socketFile)} + + topic1, errs1, err := Subscribe(socket, "local/instance", opts) + require.NoError(t, err) + go func() { + for { + select { + case e := <-errs1: + panic(e) + case m, ok := <-topic1: + if ok { + var val event + require.NoError(t, m.Decode(&val)) + received1 <- val + } else { + close(received1) + } + } + } + }() + + topic2, errs2, err := Subscribe(socket, "local/instancetest", opts) + require.NoError(t, err) + go func() { + for { + select { + case e := <-errs2: + panic(e) + case m, ok := <-topic2: + if ok { + var val event + require.NoError(t, m.Decode(&val)) + received2 <- val + } else { + close(received2) + } + } + } + }() + + go func() { + for { + <-time.After(10 * time.Millisecond) + now := time.Now() + evt := event{Time: now.UnixNano(), Message: fmt.Sprintf("Now is %v", now)} + require.NoError(t, broker.Publish("local/instance", evt)) + evt = event{Time: now.Add(1 * time.Minute).UnixNano(), Message: fmt.Sprintf("Now is %v", now.Add(1*time.Minute))} + require.NoError(t, broker.Publish("local/instancetest", evt)) + } + }() + + // Test a few rounds to make sure all subscribers get the same messages each round. + for i := 0; i < 5; i++ { + fmt.Print("runtestcycle") + b := <-received2 + a := <-received1 + require.NotNil(t, a) + require.NotEqual(t, "", a.Message) + require.NotEqual(t, a, b) + } + + broker.Stop() + +} + // This tests the case where the broker is mapped to url route (e.g. /events) // In this case, we need to specify the path option in the connection options so that // we can properly connect to the broker at the url prefix. From 364c58a3396244a9bd6069e6cb0d09fcb06a67e1 Mon Sep 17 00:00:00 2001 From: Yuji Oshima Date: Tue, 7 Mar 2017 18:36:04 +0900 Subject: [PATCH 4/5] move the check to a function Signed-off-by: Yuji Oshima --- pkg/broker/server/sse.go | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/pkg/broker/server/sse.go b/pkg/broker/server/sse.go index 7b3a806d1..1392b03a7 100644 --- a/pkg/broker/server/sse.go +++ b/pkg/broker/server/sse.go @@ -160,6 +160,19 @@ func (b *Broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) { } } +func (b *Broker) checkPath(subscribedPath string, publishedPath string) bool { + if subscribedPath != "/" { + // Make sure that the topic subscribed to by the client is the upper topic of the topic being notified or the topic exactly matched. + pPath := strings.Split(publishedPath, "/") + for i, sliceTopic := range strings.Split(subscribedPath, "/") { + if pPath[i] != sliceTopic { + return false + } + } + } + return true +} + func (b *Broker) run() { for { select { @@ -245,14 +258,8 @@ func (b *Broker) run() { panic("assert-failed") } - if key != "/" { - // Make sure that the topic subscribed to by the client is the upper topic of the topic being notified or the topic exactly matched. - notfyTopic := strings.Split(event.topic, "/") - for i, sliceTopic := range strings.Split(key, "/") { - if notfyTopic[i] != sliceTopic { - return false - } - } + if !b.checkPath(key, event.topic) { + return false } for ch := range chset { From 039bb273f1f7124af1f1cce43bc6832844eea63c Mon Sep 17 00:00:00 2001 From: Yuji Oshima Date: Tue, 7 Mar 2017 18:53:23 +0900 Subject: [PATCH 5/5] fix Signed-off-by: Yuji Oshima --- pkg/broker/server/sse.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/broker/server/sse.go b/pkg/broker/server/sse.go index 1392b03a7..f368108ef 100644 --- a/pkg/broker/server/sse.go +++ b/pkg/broker/server/sse.go @@ -160,9 +160,9 @@ func (b *Broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) { } } +//checkPath Compare the subscribed topic with the published topic to see if the message can be sent to the client. func (b *Broker) checkPath(subscribedPath string, publishedPath string) bool { if subscribedPath != "/" { - // Make sure that the topic subscribed to by the client is the upper topic of the topic being notified or the topic exactly matched. pPath := strings.Split(publishedPath, "/") for i, sliceTopic := range strings.Split(subscribedPath, "/") { if pPath[i] != sliceTopic { @@ -258,6 +258,7 @@ func (b *Broker) run() { panic("assert-failed") } + // Make sure that the topic subscribed to by the client is the upper topic of the topic being notified or the topic exactly matched. if !b.checkPath(key, event.topic) { return false }