Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] Ability to suppress echos from your own connection #698

Merged
merged 3 commits into from
Jun 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,10 @@ type client struct {
rttStart time.Time

route *route

debug bool
trace bool
echo bool

flags clientFlag // Compact booleans into a single field. Size will be increased when needed.
}
Expand Down Expand Up @@ -235,6 +237,7 @@ type subscription struct {
}

type clientOpts struct {
Echo bool `json:"echo"`
Verbose bool `json:"verbose"`
Pedantic bool `json:"pedantic"`
TLSRequired bool `json:"tls_required"`
Expand All @@ -247,7 +250,7 @@ type clientOpts struct {
Protocol int `json:"protocol"`
}

var defaultOpts = clientOpts{Verbose: true, Pedantic: true}
var defaultOpts = clientOpts{Verbose: true, Pedantic: true, Echo: true}

func init() {
rand.Seed(time.Now().UnixNano())
Expand All @@ -267,6 +270,8 @@ func (c *client) initClient() {
c.out.mp = opts.MaxPending

c.subs = make(map[string]*subscription)
c.echo = true

c.debug = (atomic.LoadInt32(&c.srv.logging.debug) != 0)
c.trace = (atomic.LoadInt32(&c.srv.logging.trace) != 0)

Expand Down Expand Up @@ -696,6 +701,7 @@ func (c *client) processConnect(arg []byte) error {
// server now knows which protocol this client supports.
c.flags.set(connectReceived)
// Capture these under lock
c.echo = c.opts.Echo
proto := c.opts.Protocol
verbose := c.opts.Verbose
lang := c.opts.Lang
Expand Down Expand Up @@ -1272,6 +1278,13 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool {
}
client := sub.client
client.mu.Lock()

// Check echo
if c == client && !client.echo {
client.mu.Unlock()
return false
}

srv := client.srv

sub.nm++
Expand Down
101 changes: 94 additions & 7 deletions server/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,15 @@ func TestClientConnect(t *testing.T) {
_, c, _ := setupClient()

// Basic Connect setting flags
connectOp := []byte("CONNECT {\"verbose\":true,\"pedantic\":true,\"tls_required\":false}\r\n")
connectOp := []byte("CONNECT {\"verbose\":true,\"pedantic\":true,\"tls_required\":false,\"echo\":false}\r\n")
err := c.parse(connectOp)
if err != nil {
t.Fatalf("Received error: %v\n", err)
}
if c.state != OP_START {
t.Fatalf("Expected state of OP_START vs %d\n", c.state)
}
if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true}) {
if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true, Echo: false}) {
t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts)
}

Expand All @@ -155,7 +155,7 @@ func TestClientConnect(t *testing.T) {
if c.state != OP_START {
t.Fatalf("Expected state of OP_START vs %d\n", c.state)
}
if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true, Username: "derek", Password: "foo"}) {
if !reflect.DeepEqual(c.opts, clientOpts{Echo: true, Verbose: true, Pedantic: true, Username: "derek", Password: "foo"}) {
t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts)
}

Expand All @@ -170,7 +170,7 @@ func TestClientConnect(t *testing.T) {
t.Fatalf("Expected state of OP_START vs %d\n", c.state)
}

if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true, Username: "derek", Password: "foo", Name: "router"}) {
if !reflect.DeepEqual(c.opts, clientOpts{Echo: true, Verbose: true, Pedantic: true, Username: "derek", Password: "foo", Name: "router"}) {
t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts)
}

Expand All @@ -185,7 +185,7 @@ func TestClientConnect(t *testing.T) {
t.Fatalf("Expected state of OP_START vs %d\n", c.state)
}

if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true, Authorization: "YZZ222", Name: "router"}) {
if !reflect.DeepEqual(c.opts, clientOpts{Echo: true, Verbose: true, Pedantic: true, Authorization: "YZZ222", Name: "router"}) {
t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts)
}
}
Expand All @@ -202,7 +202,7 @@ func TestClientConnectProto(t *testing.T) {
if c.state != OP_START {
t.Fatalf("Expected state of OP_START vs %d\n", c.state)
}
if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true, Protocol: ClientProtoZero}) {
if !reflect.DeepEqual(c.opts, clientOpts{Echo: true, Verbose: true, Pedantic: true, Protocol: ClientProtoZero}) {
t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts)
}

Expand All @@ -215,7 +215,7 @@ func TestClientConnectProto(t *testing.T) {
if c.state != OP_START {
t.Fatalf("Expected state of OP_START vs %d\n", c.state)
}
if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true, Protocol: ClientProtoInfo}) {
if !reflect.DeepEqual(c.opts, clientOpts{Echo: true, Verbose: true, Pedantic: true, Protocol: ClientProtoInfo}) {
t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts)
}
if c.opts.Protocol != ClientProtoInfo {
Expand Down Expand Up @@ -310,6 +310,26 @@ func TestClientSimplePubSub(t *testing.T) {
checkPayload(cr, []byte("hello\r\n"), t)
}

func TestClientPubSubNoEcho(t *testing.T) {
_, c, cr := setupClient()
// Specify no echo
connectOp := []byte("CONNECT {\"echo\":false}\r\n")
err := c.parse(connectOp)
if err != nil {
t.Fatalf("Received error: %v\n", err)
}
// SUB/PUB
go c.parse([]byte("SUB foo 1\r\nPUB foo 5\r\nhello\r\nPING\r\n"))
l, err := cr.ReadString('\n')
if err != nil {
t.Fatalf("Error receiving msg from server: %v\n", err)
}
// We should not receive anything but a PONG since we specified no echo.
if !strings.HasPrefix(l, "PONG\r\n") {
t.Fatalf("PONG response incorrect: %q\n", l)
}
}

func TestClientSimplePubSubWithReply(t *testing.T) {
_, c, cr := setupClient()

Expand Down Expand Up @@ -416,6 +436,73 @@ func TestClientPubWithQueueSub(t *testing.T) {
}
}

func TestClientPubWithQueueSubNoEcho(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()

nc1, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc1.Close()

// Grab the client from server and set no echo by hand.
s.mu.Lock()
lc := len(s.clients)
c := s.clients[s.gcid]
s.mu.Unlock()

if lc != 1 {
t.Fatalf("Expected only 1 client but got %d\n", lc)
}
if c == nil {
t.Fatal("Expected to retrieve client\n")
}
c.mu.Lock()
c.echo = false
c.mu.Unlock()

// Queue sub on nc1.
_, err = nc1.QueueSubscribe("foo", "bar", func(*nats.Msg) {})
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
nc1.Flush()

nc2, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc2.Close()

n := int32(0)
cb := func(m *nats.Msg) {
atomic.AddInt32(&n, 1)
}

_, err = nc2.QueueSubscribe("foo", "bar", cb)
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
nc2.Flush()

// Now publish 100 messages on nc1 which does not allow echo.
for i := 0; i < 100; i++ {
nc1.Publish("foo", []byte("Hello"))
}
nc1.Flush()
nc2.Flush()

checkFor(t, 5*time.Second, 10*time.Millisecond, func() error {
num := atomic.LoadInt32(&n)
if num != int32(100) {
return fmt.Errorf("Expected all the msgs to be received by nc2, got %d\n", num)
}
return nil
})
}

func TestClientUnSub(t *testing.T) {
_, c, cr := setupClient()

Expand Down
8 changes: 7 additions & 1 deletion server/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,13 @@ var (

const (
// VERSION is the current version for the server.
VERSION = "1.2.0-beta3"
VERSION = "1.2.0-beta4"

// PROTO is the currently supported protocol.
// 0 was the original
// 1 maintains proto 0, adds echo abilities for CONNECT from the client. Clients
// should not send echo unless proto in INFO is >= 1.
PROTO = 1

// DEFAULT_PORT is the default port for client connections.
DEFAULT_PORT = 4222
Expand Down
2 changes: 2 additions & 0 deletions server/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type route struct {
}

type connectInfo struct {
Echo bool `json:"echo"`
Verbose bool `json:"verbose"`
Pedantic bool `json:"pedantic"`
User string `json:"user,omitempty"`
Expand Down Expand Up @@ -259,6 +260,7 @@ func (c *client) sendConnect(tlsRequired bool) {
pass, _ = userInfo.Password()
}
cinfo := connectInfo{
Echo: true,
Verbose: false,
Pedantic: false,
User: user,
Expand Down
2 changes: 2 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
type Info struct {
ID string `json:"server_id"`
Version string `json:"version"`
Proto int `json:"proto"`
GitCommit string `json:"git_commit,omitempty"`
GoVersion string `json:"go"`
Host string `json:"host"`
Expand Down Expand Up @@ -144,6 +145,7 @@ func New(opts *Options) *Server {
info := Info{
ID: genID(),
Version: VERSION,
Proto: PROTO,
GitCommit: gitCommit,
GoVersion: runtime.Version(),
Host: opts.Host,
Expand Down