Skip to content

Commit

Permalink
[ISSUE apache#679] Fix judgment of topic route equality and optimize …
Browse files Browse the repository at this point in the history
…loadBalancer (apache#680)

* fix(golang): correct judgment of topic route equality and optimize load balancer

* fix(golang): add tests for routeEqual

* refactor
  • Loading branch information
redlsz authored Feb 26, 2024
1 parent 2eedee0 commit 80d9c07
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 10 deletions.
46 changes: 36 additions & 10 deletions golang/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ import (
"encoding/hex"
"errors"
"fmt"
"reflect"
"sync"
"time"

innerMD "github.com/apache/rocketmq-clients/golang/v5/metadata"
"github.com/apache/rocketmq-clients/golang/v5/pkg/ticker"
"github.com/apache/rocketmq-clients/golang/v5/pkg/utils"
v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
"github.com/golang/protobuf/proto"
"github.com/google/uuid"
"go.uber.org/atomic"
"go.uber.org/zap"
Expand Down Expand Up @@ -504,27 +504,40 @@ func (cli *defaultClient) startUp() error {
f := func() {
cli.router.Range(func(k, v interface{}) bool {
topic := k.(string)
oldRoute := v
newRoute, err := cli.queryRoute(context.TODO(), topic, cli.opts.timeout)
if err != nil {
cli.log.Errorf("scheduled queryRoute err=%v", err)
}
if newRoute == nil && oldRoute != nil {
if newRoute == nil && v != nil {
cli.log.Info("newRoute is nil, but oldRoute is not. do not update")
return true
}
if !reflect.DeepEqual(newRoute, oldRoute) {
var oldRoute []*v2.MessageQueue
if v != nil {
oldRoute = v.([]*v2.MessageQueue)
}
if !routeEqual(oldRoute, newRoute) {
cli.router.Store(k, newRoute)
switch impl := cli.clientImpl.(type) {
case *defaultProducer:
plb, err := NewPublishingLoadBalancer(newRoute)
if err == nil {
impl.publishingRouteDataResultCache.Store(topic, plb)
existing, ok := impl.publishingRouteDataResultCache.Load(topic)
if !ok {
plb, err := NewPublishingLoadBalancer(newRoute)
if err == nil {
impl.publishingRouteDataResultCache.Store(topic, plb)
}
} else {
impl.publishingRouteDataResultCache.Store(topic, existing.(PublishingLoadBalancer).CopyAndUpdate(newRoute))
}
case *defaultSimpleConsumer:
slb, err := NewSubscriptionLoadBalancer(newRoute)
if err == nil {
impl.subTopicRouteDataResultCache.Store(topic, slb)
existing, ok := impl.subTopicRouteDataResultCache.Load(topic)
if !ok {
slb, err := NewSubscriptionLoadBalancer(newRoute)
if err == nil {
impl.subTopicRouteDataResultCache.Store(topic, slb)
}
} else {
impl.subTopicRouteDataResultCache.Store(topic, existing.(SubscriptionLoadBalancer).CopyAndUpdate(newRoute))
}
}
}
Expand All @@ -534,6 +547,19 @@ func (cli *defaultClient) startUp() error {
ticker.Tick(f, time.Second*30, cli.done)
return nil
}

func routeEqual(old, new []*v2.MessageQueue) bool {
if len(old) != len(new) {
return false
}
for i := 0; i < len(old); i++ {
if !proto.Equal(old[i], new[i]) {
return false
}
}
return true
}

func (cli *defaultClient) notifyClientTermination() {
cli.log.Info("start notifyClientTermination")
ctx := cli.Sign(context.Background())
Expand Down
46 changes: 46 additions & 0 deletions golang/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package golang
import (
"context"
"fmt"
"reflect"
"testing"
"time"

Expand Down Expand Up @@ -293,3 +294,48 @@ func TestRestoreDefaultClientSessionTwoErrors(t *testing.T) {
assert.Equal(t, "Encountered error while receiving TelemetryCommand, trying to recover", commandExecutionLog[0].Message)
assert.Equal(t, "Failed to recover, err=EOF", commandExecutionLog[1].Message)
}

func Test_routeEqual(t *testing.T) {
oldMq := &v2.MessageQueue{
Topic: &v2.Resource{
Name: "topic-test",
},
Id: 0,
Permission: v2.Permission_READ_WRITE,
Broker: &v2.Broker{
Name: "broker-test",
Id: 0,
Endpoints: fakeEndpoints(),
},
AcceptMessageTypes: []v2.MessageType{
v2.MessageType_NORMAL,
},
}
newMq := &v2.MessageQueue{
Topic: &v2.Resource{
Name: "topic-test",
},
Id: 0,
Permission: v2.Permission_READ_WRITE,
Broker: &v2.Broker{
Name: "broker-test",
Id: 0,
Endpoints: fakeEndpoints(),
},
AcceptMessageTypes: []v2.MessageType{
v2.MessageType_NORMAL,
},
}

newMq.ProtoReflect() // message internal field value will be changed

oldRoute := []*v2.MessageQueue{oldMq}
newRoute := []*v2.MessageQueue{newMq}

assert.Equal(t, false, reflect.DeepEqual(oldRoute, newRoute))
assert.Equal(t, true, routeEqual(oldRoute, newRoute))
assert.Equal(t, true, routeEqual(nil, nil))
assert.Equal(t, false, routeEqual(nil, newRoute))
assert.Equal(t, false, routeEqual(oldRoute, nil))
assert.Equal(t, true, routeEqual(nil, []*v2.MessageQueue{}))
}
16 changes: 16 additions & 0 deletions golang/loadBalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
type PublishingLoadBalancer interface {
TakeMessageQueueByMessageGroup(messageGroup *string) ([]*v2.MessageQueue, error)
TakeMessageQueues(excluded sync.Map, count int) ([]*v2.MessageQueue, error)
CopyAndUpdate([]*v2.MessageQueue) PublishingLoadBalancer
}

type publishingLoadBalancer struct {
Expand Down Expand Up @@ -119,8 +120,16 @@ func (plb *publishingLoadBalancer) TakeMessageQueues(excluded sync.Map, count in
return candidates, nil
}

func (plb *publishingLoadBalancer) CopyAndUpdate(messageQueues []*v2.MessageQueue) PublishingLoadBalancer {
return &publishingLoadBalancer{
messageQueues: messageQueues,
index: plb.index,
}
}

type SubscriptionLoadBalancer interface {
TakeMessageQueue() (*v2.MessageQueue, error)
CopyAndUpdate([]*v2.MessageQueue) SubscriptionLoadBalancer
}

type subscriptionLoadBalancer struct {
Expand All @@ -147,3 +156,10 @@ func (slb *subscriptionLoadBalancer) TakeMessageQueue() (*v2.MessageQueue, error
selectMessageQueue := slb.messageQueues[idx]
return selectMessageQueue, nil
}

func (slb *subscriptionLoadBalancer) CopyAndUpdate(messageQueues []*v2.MessageQueue) SubscriptionLoadBalancer {
return &subscriptionLoadBalancer{
messageQueues: messageQueues,
index: slb.index,
}
}

0 comments on commit 80d9c07

Please sign in to comment.