Skip to content

Commit

Permalink
Add no echo feature
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Collison <[email protected]>
  • Loading branch information
derekcollison committed Jun 29, 2018
1 parent e17ccb8 commit 061788a
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 8 deletions.
15 changes: 14 additions & 1 deletion server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,10 @@ type client struct {
rttStart time.Time

route *route

debug bool
trace bool
echo bool

flags clientFlag // Compact booleans into a single field. Size will be increased when needed.
}
Expand Down Expand Up @@ -235,6 +237,7 @@ type subscription struct {
}

type clientOpts struct {
Echo bool `json:"echo"`
Verbose bool `json:"verbose"`
Pedantic bool `json:"pedantic"`
TLSRequired bool `json:"tls_required"`
Expand All @@ -247,7 +250,7 @@ type clientOpts struct {
Protocol int `json:"protocol"`
}

var defaultOpts = clientOpts{Verbose: true, Pedantic: true}
var defaultOpts = clientOpts{Verbose: true, Pedantic: true, Echo: true}

func init() {
rand.Seed(time.Now().UnixNano())
Expand All @@ -267,6 +270,8 @@ func (c *client) initClient() {
c.out.mp = opts.MaxPending

c.subs = make(map[string]*subscription)
c.echo = true

c.debug = (atomic.LoadInt32(&c.srv.logging.debug) != 0)
c.trace = (atomic.LoadInt32(&c.srv.logging.trace) != 0)

Expand Down Expand Up @@ -696,6 +701,7 @@ func (c *client) processConnect(arg []byte) error {
// server now knows which protocol this client supports.
c.flags.set(connectReceived)
// Capture these under lock
c.echo = c.opts.Echo
proto := c.opts.Protocol
verbose := c.opts.Verbose
lang := c.opts.Lang
Expand Down Expand Up @@ -1272,6 +1278,13 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool {
}
client := sub.client
client.mu.Lock()

// Check echo
if c == client && !client.echo {
client.mu.Unlock()
return false
}

srv := client.srv

sub.nm++
Expand Down
96 changes: 89 additions & 7 deletions server/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,15 @@ func TestClientConnect(t *testing.T) {
_, c, _ := setupClient()

// Basic Connect setting flags
connectOp := []byte("CONNECT {\"verbose\":true,\"pedantic\":true,\"tls_required\":false}\r\n")
connectOp := []byte("CONNECT {\"verbose\":true,\"pedantic\":true,\"tls_required\":false,\"echo\":false}\r\n")
err := c.parse(connectOp)
if err != nil {
t.Fatalf("Received error: %v\n", err)
}
if c.state != OP_START {
t.Fatalf("Expected state of OP_START vs %d\n", c.state)
}
if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true}) {
if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true, Echo: false}) {
t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts)
}

Expand All @@ -155,7 +155,7 @@ func TestClientConnect(t *testing.T) {
if c.state != OP_START {
t.Fatalf("Expected state of OP_START vs %d\n", c.state)
}
if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true, Username: "derek", Password: "foo"}) {
if !reflect.DeepEqual(c.opts, clientOpts{Echo: true, Verbose: true, Pedantic: true, Username: "derek", Password: "foo"}) {
t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts)
}

Expand All @@ -170,7 +170,7 @@ func TestClientConnect(t *testing.T) {
t.Fatalf("Expected state of OP_START vs %d\n", c.state)
}

if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true, Username: "derek", Password: "foo", Name: "router"}) {
if !reflect.DeepEqual(c.opts, clientOpts{Echo: true, Verbose: true, Pedantic: true, Username: "derek", Password: "foo", Name: "router"}) {
t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts)
}

Expand All @@ -185,7 +185,7 @@ func TestClientConnect(t *testing.T) {
t.Fatalf("Expected state of OP_START vs %d\n", c.state)
}

if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true, Authorization: "YZZ222", Name: "router"}) {
if !reflect.DeepEqual(c.opts, clientOpts{Echo: true, Verbose: true, Pedantic: true, Authorization: "YZZ222", Name: "router"}) {
t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts)
}
}
Expand All @@ -202,7 +202,7 @@ func TestClientConnectProto(t *testing.T) {
if c.state != OP_START {
t.Fatalf("Expected state of OP_START vs %d\n", c.state)
}
if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true, Protocol: ClientProtoZero}) {
if !reflect.DeepEqual(c.opts, clientOpts{Echo: true, Verbose: true, Pedantic: true, Protocol: ClientProtoZero}) {
t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts)
}

Expand All @@ -215,7 +215,7 @@ func TestClientConnectProto(t *testing.T) {
if c.state != OP_START {
t.Fatalf("Expected state of OP_START vs %d\n", c.state)
}
if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true, Protocol: ClientProtoInfo}) {
if !reflect.DeepEqual(c.opts, clientOpts{Echo: true, Verbose: true, Pedantic: true, Protocol: ClientProtoInfo}) {
t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts)
}
if c.opts.Protocol != ClientProtoInfo {
Expand Down Expand Up @@ -310,6 +310,26 @@ func TestClientSimplePubSub(t *testing.T) {
checkPayload(cr, []byte("hello\r\n"), t)
}

func TestClientPubSubNoEcho(t *testing.T) {
_, c, cr := setupClient()
// Specify no echo
connectOp := []byte("CONNECT {\"echo\":false}\r\n")
err := c.parse(connectOp)
if err != nil {
t.Fatalf("Received error: %v\n", err)
}
// SUB/PUB
go c.parse([]byte("SUB foo 1\r\nPUB foo 5\r\nhello\r\nPING\r\n"))
l, err := cr.ReadString('\n')
if err != nil {
t.Fatalf("Error receiving msg from server: %v\n", err)
}
// We should not receive anything but a PONG since we specified no echo.
if !strings.HasPrefix(l, "PONG\r\n") {
t.Fatalf("PONG response incorrect: %q\n", l)
}
}

func TestClientSimplePubSubWithReply(t *testing.T) {
_, c, cr := setupClient()

Expand Down Expand Up @@ -416,6 +436,68 @@ func TestClientPubWithQueueSub(t *testing.T) {
}
}

func TestClientPubWithQueueSubNoEcho(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()

nc1, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc1.Close()

// Grab the client from server and set no echo by hand.
s.mu.Lock()
lc := len(s.clients)
c := s.clients[s.gcid]
s.mu.Unlock()

if lc != 1 {
t.Fatalf("Expected only 1 client but got %d\n", lc)
}
if c == nil {
t.Fatal("Expected to retrieve client\n")
}
c.mu.Lock()
c.echo = false
c.mu.Unlock()

// Queue sub on nc1.
_, err = nc1.QueueSubscribe("foo", "bar", func(*nats.Msg) {})
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

nc2, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc2.Close()

n := int32(0)
cb := func(m *nats.Msg) {
atomic.AddInt32(&n, 1)
}

_, err = nc2.QueueSubscribe("foo", "bar", cb)
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

// Now publish 100 messages.
for i := 0; i < 100; i++ {
nc1.Publish("foo", []byte("Hello"))
}
nc1.Flush()
nc2.Flush()

num := atomic.LoadInt32(&n)
if num != int32(100) {
t.Fatalf("Expected all the msgs to be received by nc2, got %d\n", num)
}
}

func TestClientUnSub(t *testing.T) {
_, c, cr := setupClient()

Expand Down
2 changes: 2 additions & 0 deletions server/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type route struct {
}

type connectInfo struct {
Echo bool `json:"echo"`
Verbose bool `json:"verbose"`
Pedantic bool `json:"pedantic"`
User string `json:"user,omitempty"`
Expand Down Expand Up @@ -259,6 +260,7 @@ func (c *client) sendConnect(tlsRequired bool) {
pass, _ = userInfo.Password()
}
cinfo := connectInfo{
Echo: true,
Verbose: false,
Pedantic: false,
User: user,
Expand Down

0 comments on commit 061788a

Please sign in to comment.