diff --git a/server/client.go b/server/client.go index 4c8e7407806..6042a837921 100644 --- a/server/client.go +++ b/server/client.go @@ -154,8 +154,10 @@ type client struct { rttStart time.Time route *route + debug bool trace bool + echo bool flags clientFlag // Compact booleans into a single field. Size will be increased when needed. } @@ -235,6 +237,7 @@ type subscription struct { } type clientOpts struct { + Echo bool `json:"echo"` Verbose bool `json:"verbose"` Pedantic bool `json:"pedantic"` TLSRequired bool `json:"tls_required"` @@ -247,7 +250,7 @@ type clientOpts struct { Protocol int `json:"protocol"` } -var defaultOpts = clientOpts{Verbose: true, Pedantic: true} +var defaultOpts = clientOpts{Verbose: true, Pedantic: true, Echo: true} func init() { rand.Seed(time.Now().UnixNano()) @@ -267,6 +270,8 @@ func (c *client) initClient() { c.out.mp = opts.MaxPending c.subs = make(map[string]*subscription) + c.echo = true + c.debug = (atomic.LoadInt32(&c.srv.logging.debug) != 0) c.trace = (atomic.LoadInt32(&c.srv.logging.trace) != 0) @@ -696,6 +701,7 @@ func (c *client) processConnect(arg []byte) error { // server now knows which protocol this client supports. c.flags.set(connectReceived) // Capture these under lock + c.echo = c.opts.Echo proto := c.opts.Protocol verbose := c.opts.Verbose lang := c.opts.Lang @@ -1272,6 +1278,13 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool { } client := sub.client client.mu.Lock() + + // Check echo + if c == client && !client.echo { + client.mu.Unlock() + return false + } + srv := client.srv sub.nm++ diff --git a/server/client_test.go b/server/client_test.go index 32c009bc057..37f8583fe18 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -133,7 +133,7 @@ func TestClientConnect(t *testing.T) { _, c, _ := setupClient() // Basic Connect setting flags - connectOp := []byte("CONNECT {\"verbose\":true,\"pedantic\":true,\"tls_required\":false}\r\n") + connectOp := []byte("CONNECT {\"verbose\":true,\"pedantic\":true,\"tls_required\":false,\"echo\":false}\r\n") err := c.parse(connectOp) if err != nil { t.Fatalf("Received error: %v\n", err) @@ -141,7 +141,7 @@ func TestClientConnect(t *testing.T) { if c.state != OP_START { t.Fatalf("Expected state of OP_START vs %d\n", c.state) } - if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true}) { + if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true, Echo: false}) { t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts) } @@ -155,7 +155,7 @@ func TestClientConnect(t *testing.T) { if c.state != OP_START { t.Fatalf("Expected state of OP_START vs %d\n", c.state) } - if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true, Username: "derek", Password: "foo"}) { + if !reflect.DeepEqual(c.opts, clientOpts{Echo: true, Verbose: true, Pedantic: true, Username: "derek", Password: "foo"}) { t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts) } @@ -170,7 +170,7 @@ func TestClientConnect(t *testing.T) { t.Fatalf("Expected state of OP_START vs %d\n", c.state) } - if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true, Username: "derek", Password: "foo", Name: "router"}) { + if !reflect.DeepEqual(c.opts, clientOpts{Echo: true, Verbose: true, Pedantic: true, Username: "derek", Password: "foo", Name: "router"}) { t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts) } @@ -185,7 +185,7 @@ func TestClientConnect(t *testing.T) { t.Fatalf("Expected state of OP_START vs %d\n", c.state) } - if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true, Authorization: "YZZ222", Name: "router"}) { + if !reflect.DeepEqual(c.opts, clientOpts{Echo: true, Verbose: true, Pedantic: true, Authorization: "YZZ222", Name: "router"}) { t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts) } } @@ -202,7 +202,7 @@ func TestClientConnectProto(t *testing.T) { if c.state != OP_START { t.Fatalf("Expected state of OP_START vs %d\n", c.state) } - if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true, Protocol: ClientProtoZero}) { + if !reflect.DeepEqual(c.opts, clientOpts{Echo: true, Verbose: true, Pedantic: true, Protocol: ClientProtoZero}) { t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts) } @@ -215,7 +215,7 @@ func TestClientConnectProto(t *testing.T) { if c.state != OP_START { t.Fatalf("Expected state of OP_START vs %d\n", c.state) } - if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true, Protocol: ClientProtoInfo}) { + if !reflect.DeepEqual(c.opts, clientOpts{Echo: true, Verbose: true, Pedantic: true, Protocol: ClientProtoInfo}) { t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts) } if c.opts.Protocol != ClientProtoInfo { @@ -310,6 +310,26 @@ func TestClientSimplePubSub(t *testing.T) { checkPayload(cr, []byte("hello\r\n"), t) } +func TestClientPubSubNoEcho(t *testing.T) { + _, c, cr := setupClient() + // Specify no echo + connectOp := []byte("CONNECT {\"echo\":false}\r\n") + err := c.parse(connectOp) + if err != nil { + t.Fatalf("Received error: %v\n", err) + } + // SUB/PUB + go c.parse([]byte("SUB foo 1\r\nPUB foo 5\r\nhello\r\nPING\r\n")) + l, err := cr.ReadString('\n') + if err != nil { + t.Fatalf("Error receiving msg from server: %v\n", err) + } + // We should not receive anything but a PONG since we specified no echo. + if !strings.HasPrefix(l, "PONG\r\n") { + t.Fatalf("PONG response incorrect: %q\n", l) + } +} + func TestClientSimplePubSubWithReply(t *testing.T) { _, c, cr := setupClient() @@ -416,6 +436,73 @@ func TestClientPubWithQueueSub(t *testing.T) { } } +func TestClientPubWithQueueSubNoEcho(t *testing.T) { + opts := DefaultOptions() + s := RunServer(opts) + defer s.Shutdown() + + nc1, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc1.Close() + + // Grab the client from server and set no echo by hand. + s.mu.Lock() + lc := len(s.clients) + c := s.clients[s.gcid] + s.mu.Unlock() + + if lc != 1 { + t.Fatalf("Expected only 1 client but got %d\n", lc) + } + if c == nil { + t.Fatal("Expected to retrieve client\n") + } + c.mu.Lock() + c.echo = false + c.mu.Unlock() + + // Queue sub on nc1. + _, err = nc1.QueueSubscribe("foo", "bar", func(*nats.Msg) {}) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + nc1.Flush() + + nc2, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc2.Close() + + n := int32(0) + cb := func(m *nats.Msg) { + atomic.AddInt32(&n, 1) + } + + _, err = nc2.QueueSubscribe("foo", "bar", cb) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + nc2.Flush() + + // Now publish 100 messages on nc1 which does not allow echo. + for i := 0; i < 100; i++ { + nc1.Publish("foo", []byte("Hello")) + } + nc1.Flush() + nc2.Flush() + + checkFor(t, 5*time.Second, 10*time.Millisecond, func() error { + num := atomic.LoadInt32(&n) + if num != int32(100) { + return fmt.Errorf("Expected all the msgs to be received by nc2, got %d\n", num) + } + return nil + }) +} + func TestClientUnSub(t *testing.T) { _, c, cr := setupClient() diff --git a/server/const.go b/server/const.go index bb4a30127e9..05cd6da8054 100644 --- a/server/const.go +++ b/server/const.go @@ -35,7 +35,13 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "1.2.0-beta3" + VERSION = "1.2.0-beta4" + + // PROTO is the currently supported protocol. + // 0 was the original + // 1 maintains proto 0, adds echo abilities for CONNECT from the client. Clients + // should not send echo unless proto in INFO is >= 1. + PROTO = 1 // DEFAULT_PORT is the default port for client connections. DEFAULT_PORT = 4222 diff --git a/server/route.go b/server/route.go index 9d938bad955..c12a713da75 100644 --- a/server/route.go +++ b/server/route.go @@ -53,6 +53,7 @@ type route struct { } type connectInfo struct { + Echo bool `json:"echo"` Verbose bool `json:"verbose"` Pedantic bool `json:"pedantic"` User string `json:"user,omitempty"` @@ -259,6 +260,7 @@ func (c *client) sendConnect(tlsRequired bool) { pass, _ = userInfo.Password() } cinfo := connectInfo{ + Echo: true, Verbose: false, Pedantic: false, User: user, diff --git a/server/server.go b/server/server.go index 10fa85fdd9f..35e0e3733d7 100644 --- a/server/server.go +++ b/server/server.go @@ -41,6 +41,7 @@ import ( type Info struct { ID string `json:"server_id"` Version string `json:"version"` + Proto int `json:"proto"` GitCommit string `json:"git_commit,omitempty"` GoVersion string `json:"go"` Host string `json:"host"` @@ -144,6 +145,7 @@ func New(opts *Options) *Server { info := Info{ ID: genID(), Version: VERSION, + Proto: PROTO, GitCommit: gitCommit, GoVersion: runtime.Version(), Host: opts.Host,