Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] LeafNode: possible delivery to several DQ members across Gateway #6517

Merged
merged 1 commit into from
Feb 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4086,7 +4086,7 @@ func (c *client) processInboundClientMsg(msg []byte) (bool, bool) {
reply = append(reply, '@')
reply = append(reply, c.pa.deliver...)
}
didDeliver = c.sendMsgToGateways(acc, msg, c.pa.subject, reply, qnames) || didDeliver
didDeliver = c.sendMsgToGateways(acc, msg, c.pa.subject, reply, qnames, false) || didDeliver
}

// Check to see if we did not deliver to anyone and the client has a reply subject set
Expand Down Expand Up @@ -4133,7 +4133,7 @@ func (c *client) handleGWReplyMap(msg []byte) bool {
reply = append(reply, '@')
reply = append(reply, c.pa.deliver...)
}
c.sendMsgToGateways(c.acc, msg, c.pa.subject, reply, nil)
c.sendMsgToGateways(c.acc, msg, c.pa.subject, reply, nil, false)
}
return true
}
Expand Down Expand Up @@ -4509,7 +4509,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
flags |= pmrCollectQueueNames
var queues [][]byte
didDeliver, queues = c.processMsgResults(siAcc, rr, msg, c.pa.deliver, []byte(to), nrr, flags)
didDeliver = c.sendMsgToGateways(siAcc, msg, []byte(to), nrr, queues) || didDeliver
didDeliver = c.sendMsgToGateways(siAcc, msg, []byte(to), nrr, queues, false) || didDeliver
} else {
didDeliver, _ = c.processMsgResults(siAcc, rr, msg, c.pa.deliver, []byte(to), nrr, flags)
}
Expand Down
24 changes: 22 additions & 2 deletions server/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -2511,8 +2511,13 @@ var subPool = &sync.Pool{
// that the message is not sent to a given gateway if for instance
// it is known that this gateway has no interest in the account or
// subject, etc..
// When invoked from a LEAF connection, `checkLeafQF` should be passed as `true`
// so that we skip any queue subscription interest that is not part of the
// `c.pa.queues` filter (similar to what we do in `processMsgResults`). However,
// when processing service imports, then this boolean should be passes as `false`,
// regardless if it is a LEAF connection or not.
// <Invoked from any client connection's readLoop>
func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgroups [][]byte) bool {
func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgroups [][]byte, checkLeafQF bool) bool {
// We had some times when we were sending across a GW with no subject, and the other side would break
// due to parser error. These need to be fixed upstream but also double check here.
if len(subject) == 0 {
Expand Down Expand Up @@ -2597,6 +2602,21 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr
qsubs := qr.qsubs[i]
if len(qsubs) > 0 {
queue := qsubs[0].queue
if checkLeafQF {
// Skip any queue that is not in the leaf's queue filter.
skip := true
for _, qn := range c.pa.queues {
if bytes.Equal(queue, qn) {
skip = false
break
}
}
if skip {
continue
}
// Now we still need to check that it was not delivered
// locally by checking the given `qgroups`.
}
add := true
for _, qn := range qgroups {
if bytes.Equal(queue, qn) {
Expand Down Expand Up @@ -2994,7 +3014,7 @@ func (c *client) handleGatewayReply(msg []byte) (processed bool) {
// we now need to send the message with the real subject to
// gateways in case they have interest on that reply subject.
if !isServiceReply {
c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, queues)
c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, queues, false)
}
} else if c.kind == GATEWAY {
// Only if we are a gateway connection should we try to route
Expand Down
2 changes: 1 addition & 1 deletion server/leafnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -2863,7 +2863,7 @@ func (c *client) processInboundLeafMsg(msg []byte) {

// Now deal with gateways
if c.srv.gateway.enabled {
c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, qnames)
c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, qnames, true)
}
}

Expand Down
287 changes: 287 additions & 0 deletions server/leafnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4554,6 +4554,289 @@ func TestLeafNodeQueueGroupDistributionWithDaisyChainAndGateway(t *testing.T) {
}
}

func TestLeafNodeAndGatewaysSingleMsgPerQueueGroup(t *testing.T) {
SetGatewaysSolicitDelay(0)
defer ResetGatewaysSolicitDelay()

accs := `
accounts {
SYS: {users: [{user:sys, password: pwd}]}
USER: {users: [{user:user, password: pwd}]}
}
system_account: SYS
`
gwUSConfTmpl := `
%s
server_name: GW_US
listen: "127.0.0.1:-1"
gateway {
name: US
listen: "127.0.0.1:-1"
}
leafnodes {
listen: "127.0.0.1:-1"
}
`
gwUSConf := createConfFile(t, []byte(fmt.Sprintf(gwUSConfTmpl, accs)))
gwUS, gwUSOpts := RunServerWithConfig(gwUSConf)
defer gwUS.Shutdown()

gwEUConfTmpl := `
%s
server_name: GW_EU
listen: "127.0.0.1:-1"
gateway {
name: EU
listen: "127.0.0.1:-1"
gateways [
{
name: US
url: "nats://127.0.0.1:%d"
}
]
}
leafnodes {
listen: "127.0.0.1:-1"
}
`
gwEUConf := createConfFile(t, []byte(fmt.Sprintf(gwEUConfTmpl, accs, gwUSOpts.Gateway.Port)))
gwEU, gwEUOpts := RunServerWithConfig(gwEUConf)
defer gwEU.Shutdown()

waitForOutboundGateways(t, gwUS, 1, time.Second)
waitForOutboundGateways(t, gwEU, 1, time.Second)
waitForInboundGateways(t, gwUS, 1, time.Second)
waitForInboundGateways(t, gwEU, 1, time.Second)

leafConfTmpl := `
%s
server_name: %s
listen: "127.0.0.1:-1"
leafnodes {
remotes [
{
url: "nats://user:[email protected]:%d"
account: USER
}
]
}
`
leafUSConf := createConfFile(t, []byte(fmt.Sprintf(leafConfTmpl, accs, "LEAF_US", gwUSOpts.LeafNode.Port)))
leafUS, _ := RunServerWithConfig(leafUSConf)
defer leafUS.Shutdown()
checkLeafNodeConnected(t, leafUS)

leafEUConf := createConfFile(t, []byte(fmt.Sprintf(leafConfTmpl, accs, "LEAF_EU", gwEUOpts.LeafNode.Port)))
leafEU, _ := RunServerWithConfig(leafEUConf)
defer leafEU.Shutdown()
checkLeafNodeConnected(t, leafEU)

// Order is important! (see rest of test to understand why)
var usLeafQ, usLeafPS, usGWQ, usGWPS, euGWQ, euGWPS, euLeafQ, euLeafPS, euLeafQBaz atomic.Int32
counters := []*atomic.Int32{&usLeafQ, &usLeafPS, &usGWQ, &usGWPS, &euGWQ, &euGWPS, &euLeafQ, &euLeafPS, &euLeafQBaz}
counterNames := []string{"usLeafQ", "usLeafPS", "usGWQ", "usGWPS", "euGWQ", "euGWPS", "euLeafQ", "euLeafPS", "euLeafQBaz"}
if len(counters) != len(counterNames) {
panic("Fix test!")
}
resetCounters := func() {
for _, a := range counters {
a.Store(0)
}
}

// This test will always produce from the US leaf.
ncProd := natsConnect(t, leafUS.ClientURL(), nats.UserInfo("user", "pwd"))
defer ncProd.Close()

total := int32(1)
check := func(t *testing.T, expected []int32) {
time.Sleep(50 * time.Millisecond)
resetCounters()
for i := 0; i < int(total); i++ {
natsPub(t, ncProd, "foo.1", []byte("hello"))
}
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
for i := 0; i < len(expected); i++ {
if n := counters[i].Load(); n != expected[i] {
return fmt.Errorf("Expected counter %q to be %v, got %v", counterNames[i], expected[i], n)
}
}
return nil
})
}

// "usLeafQ", "usLeafPS", "usGWQ", "usGWPS", "euGWQ", "euGWPS", "euLeafQ", "euLeafPS", "euLeafQBaz"
for _, test := range []struct {
subs []int
expected []int32
}{
// We will always have the qsub on leaf EU, and have some permutations
// of queue and plain subs on other server(s) and check we get the
// expected distribution.

// Simple test firs, qsubs on leaf US and leaf EU, all messages stay in leaf US.
{
[]int{1, 0, 0, 0, 0, 0, 1, 0, 0},
[]int32{total, 0, 0, 0, 0, 0, 0, 0, 0},
},
// Move the queue sub from leaf US to GW US.
{
[]int{0, 0, 1, 0, 0, 0, 1, 0, 0},
[]int32{0, 0, total, 0, 0, 0, 0, 0, 0},
},
// Now move it to GW EU.
{
[]int{0, 0, 0, 0, 1, 0, 1, 0, 0},
[]int32{0, 0, 0, 0, total, 0, 0, 0, 0},
},

// More combinations...
{
[]int{1, 1, 0, 0, 0, 0, 1, 0, 0},
[]int32{total, total, 0, 0, 0, 0, 0, 0, 0},
},
{
[]int{0, 1, 1, 0, 0, 0, 1, 0, 0},
[]int32{0, total, total, 0, 0, 0, 0, 0, 0},
},
{
[]int{0, 1, 1, 1, 0, 0, 1, 0, 0},
[]int32{0, total, total, total, 0, 0, 0, 0, 0},
},
{
[]int{0, 1, 0, 1, 1, 0, 1, 0, 0},
[]int32{0, total, 0, total, total, 0, 0, 0, 0},
},
{
[]int{0, 1, 0, 1, 1, 1, 1, 0, 0},
[]int32{0, total, 0, total, total, total, 0, 0, 0},
},
// If we have the qsub in leaf US, does not matter if we have
// qsubs in GW US and EU, only leaf US should receive the messages,
// but plain sub in GW servers should get them too.
{
[]int{1, 1, 1, 0, 0, 0, 1, 0, 0},
[]int32{total, total, 0, 0, 0, 0, 0, 0, 0},
},
{
[]int{1, 1, 1, 1, 0, 0, 1, 0, 0},
[]int32{total, total, 0, total, 0, 0, 0, 0, 0},
},
{
[]int{1, 1, 1, 1, 1, 0, 1, 0, 0},
[]int32{total, total, 0, total, 0, 0, 0, 0, 0},
},
{
[]int{1, 1, 1, 1, 1, 1, 1, 0, 0},
[]int32{total, total, 0, total, 0, total, 0, 0, 0},
},
// Now back to a qsub on leaf US and leaf EU, but introduce plain sub
// interest in leaf EU
{
[]int{1, 0, 0, 0, 0, 0, 1, 1, 0},
[]int32{total, 0, 0, 0, 0, 0, 0, total, 0},
},
// And add a different queue group in leaf EU and it should get the messages too.
{
[]int{1, 0, 0, 0, 0, 0, 1, 1, 1},
[]int32{total, 0, 0, 0, 0, 0, 0, total, total},
},
// Keep plain and baz queue sub interests in leaf EU and add more combinations.
{
[]int{1, 1, 0, 0, 0, 0, 1, 1, 1},
[]int32{total, total, 0, 0, 0, 0, 0, total, total},
},
{
[]int{1, 1, 1, 0, 0, 0, 1, 1, 1},
[]int32{total, total, 0, 0, 0, 0, 0, total, total},
},
{
[]int{1, 1, 1, 1, 0, 0, 1, 1, 1},
[]int32{total, total, 0, total, 0, 0, 0, total, total},
},
{
[]int{1, 1, 1, 1, 1, 0, 1, 1, 1},
[]int32{total, total, 0, total, 0, 0, 0, total, total},
},
{
[]int{1, 1, 1, 1, 1, 1, 1, 1, 1},
[]int32{total, total, 0, total, 0, total, 0, total, total},
},
} {
t.Run(_EMPTY_, func(t *testing.T) {
if len(test.subs) != len(counters) || len(test.expected) != len(counters) {
panic("Fix test")
}

ncUS := natsConnect(t, leafUS.ClientURL(), nats.UserInfo("user", "pwd"))
defer ncUS.Close()

ncGWUS := natsConnect(t, gwUS.ClientURL(), nats.UserInfo("user", "pwd"))
defer ncGWUS.Close()

ncGWEU := natsConnect(t, gwEU.ClientURL(), nats.UserInfo("user", "pwd"))
defer ncGWEU.Close()

ncEU := natsConnect(t, leafEU.ClientURL(), nats.UserInfo("user", "pwd"))
defer ncEU.Close()

if test.subs[0] > 0 {
natsQueueSub(t, ncUS, "foo.*", "bar", func(_ *nats.Msg) {
usLeafQ.Add(1)
})
}
if test.subs[1] > 0 {
natsSub(t, ncUS, "foo.>", func(_ *nats.Msg) {
usLeafPS.Add(1)
})
}
natsFlush(t, ncUS)
if test.subs[2] > 0 {
natsQueueSub(t, ncGWUS, "foo.*", "bar", func(_ *nats.Msg) {
usGWQ.Add(1)
})
}
if test.subs[3] > 0 {
natsSub(t, ncGWUS, "foo.>", func(_ *nats.Msg) {
usGWPS.Add(1)
})
}
natsFlush(t, ncGWUS)
if test.subs[4] > 0 {
natsQueueSub(t, ncGWEU, "foo.*", "bar", func(_ *nats.Msg) {
euGWQ.Add(1)
})
}
if test.subs[5] > 0 {
natsSub(t, ncGWEU, "foo.>", func(_ *nats.Msg) {
euGWPS.Add(1)
})
}
natsFlush(t, ncGWEU)
if test.subs[6] > 0 {
natsQueueSub(t, ncEU, "foo.*", "bar", func(_ *nats.Msg) {
euLeafQ.Add(1)
})
}
if test.subs[7] > 0 {
natsSub(t, ncEU, "foo.>", func(_ *nats.Msg) {
euLeafPS.Add(1)
})
}
if test.subs[8] > 0 {
// Create on different group, called "baz"
natsQueueSub(t, ncEU, "foo.*", "baz", func(_ *nats.Msg) {
euLeafQBaz.Add(1)
})
}
natsFlush(t, ncEU)

// Check that we have what we expect.
check(t, test.expected)
})
}
}

func TestLeafNodeQueueGroupWeightCorrectOnConnectionCloseInSuperCluster(t *testing.T) {
SetGatewaysSolicitDelay(0)
defer ResetGatewaysSolicitDelay()
Expand Down Expand Up @@ -8109,6 +8392,8 @@ func TestLeafNodeWithWeightedDQRequestsToSuperClusterWithStreamImportAccounts(t
subs2 = append(subs2, sub)
nc.Flush()
}
// Let's them propagate
time.Sleep(100 * time.Millisecond)
defer closeSubs(subs1)
defer closeSubs(subs2)

Expand Down Expand Up @@ -8150,6 +8435,8 @@ func TestLeafNodeWithWeightedDQRequestsToSuperClusterWithStreamImportAccounts(t
nc.Flush()
rsubs = append(rsubs, sub)
}
// Let's them propagate
time.Sleep(100 * time.Millisecond)

nc, _ = jsClientConnect(t, ln.randomServer())
defer nc.Close()
Expand Down
Loading