Skip to content

Commit

Permalink
fix: add arch picture in readme and delete unused router field. (#1279)
Browse files Browse the repository at this point in the history
* fix

* fix: delete notify
  • Loading branch information
LaurenceLiZhixin authored Jun 27, 2021
1 parent 3596e59 commit 893bfb5
Show file tree
Hide file tree
Showing 12 changed files with 21 additions and 187 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Apache Dubbo-go, a Dubbo implementation written in Golang, is born to bridge the

## Architecture

![dubbo go extend](https://dubbogo.github.io/img/doc/dubbo-go-arch.png)
![dubbo go extend](https://dubbogo.github.io/img/doc/dubbo-go3.0-arch.jpg)

Dubbo-go has been implemented most layers of Dubbo, like protocol layer, registry layer, etc. An extension module is applied to Dubbo-go in order to achieve a more flexible architecture. Developers are allowed to implement a customized layer conformed to the layer interface and use then in Dubbo-go via `extension.Set` method without modifying the source code.

Expand Down
2 changes: 1 addition & 1 deletion README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Apache Dubbo Go 语言实现,架起 Java 和 Golang 之间的桥梁,与 gRPC

## 架构

![dubbo go extend](https://dubbogo.github.io/img/doc/dubbo-go-arch.png)
![dubbo go extend](https://dubbogo.github.io/img/doc/dubbo-go3.0-arch.jpg)

Dubbo-go已经实现了Dubbo的大部分层级,包括协议层(protocol layer)、注册层(registry layer))等等。在Dubbo-go中使用了拓展模块(extension module)以实现更灵活的系统架构,开发者可以根据层接口实现一个自定义的层,并在不改动源代码的前提下通过`extension.Set`方法将它应用到Dubbo-go中。

Expand Down
2 changes: 0 additions & 2 deletions cluster/router/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,4 @@ type Chain interface {
SetInvokers([]protocol.Invoker)
// AddRouters Add routers
AddRouters([]PriorityRouter)
// GetNotifyChan get notify channel of this chain
GetNotifyChan() chan struct{}
}
62 changes: 5 additions & 57 deletions cluster/router/chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package chain
import (
"sort"
"sync"
"time"
)

import (
Expand All @@ -31,7 +30,6 @@ import (
import (
"dubbo.apache.org/dubbo-go/v3/cluster/router"
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/protocol"
Expand All @@ -53,21 +51,6 @@ type RouterChain struct {
builtinRouters []router.PriorityRouter

mutex sync.RWMutex

url *common.URL

// The times of address notification since last update for address cache
count int64
// The timestamp of last update for address cache
last time.Time
// Channel for notify to update the address cache
notify chan struct{}
// Address cache
cache atomic.Value
}

func (c *RouterChain) GetNotifyChan() chan struct{} {
return c.notify
}

// Route Loop routers in RouterChain and call Route method to determine the target invokers list.
Expand All @@ -91,9 +74,6 @@ func (c *RouterChain) AddRouters(routers []router.PriorityRouter) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.routers = newRouters
go func() {
c.notify <- struct{}{}
}()
}

// SetInvokers receives updated invokers from registry center. If the times of notification exceeds countThreshold and
Expand All @@ -102,10 +82,6 @@ func (c *RouterChain) SetInvokers(invokers []protocol.Invoker) {
c.mutex.Lock()
c.invokers = invokers
c.mutex.Unlock()

go func() {
c.notify <- struct{}{}
}()
}

// copyRouters make a snapshot copy from RouterChain's router list.
Expand Down Expand Up @@ -142,18 +118,13 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) {
return nil, perrors.Errorf("No routerFactory exits , create one please")
}

chain := &RouterChain{
last: time.Now(),
notify: make(chan struct{}),
}

routers := make([]router.PriorityRouter, 0, len(routerFactories))

for key, routerFactory := range routerFactories {
if virtualServiceConfigByte == nil || destinationRuleConfigByte == nil {
logger.Warnf("virtual Service Config or destinationRule Confi Byte may be empty, pls check your CONF_VIRTUAL_SERVICE_FILE_PATH and CONF_DEST_RULE_FILE_PATH env is correctly point to your yaml file\n")
}
r, err := routerFactory().NewPriorityRouter(virtualServiceConfigByte, destinationRuleConfigByte, chain.notify)
r, err := routerFactory().NewPriorityRouter(virtualServiceConfigByte, destinationRuleConfigByte)
if r == nil || err != nil {
logger.Errorf("router chain build router fail! routerFactories key:%s error:%vv", key, err)
continue
Expand All @@ -168,36 +139,13 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) {

routerNeedsUpdateInit := atomic.Bool{}
routerNeedsUpdateInit.Store(false)
chain.routers = newRouters
chain.builtinRouters = routers
if url != nil {
chain.url = url
}

return chain, nil
}

// isInvokersChanged compares new invokers on the right changes, compared with the old invokers on the left.
func isInvokersChanged(left []protocol.Invoker, right []protocol.Invoker) bool {
if len(right) != len(left) {
return true
chain := &RouterChain{
routers: newRouters,
builtinRouters: routers,
}

for _, r := range right {
found := false
rurl := r.GetURL()
for _, l := range left {
lurl := l.GetURL()
if common.GetCompareURLEqualFunc()(lurl, rurl, constant.TIMESTAMP_KEY, constant.REMOTE_TIMESTAMP_KEY) {
found = true
break
}
}
if !found {
return true
}
}
return false
return chain, nil
}

// sortRouter Sort router instance by priority with stable algorithm
Expand Down
78 changes: 0 additions & 78 deletions cluster/router/chain/invoker_cache.go

This file was deleted.

2 changes: 1 addition & 1 deletion cluster/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
// PriorityRouterFactory creates creates priority router with url
type PriorityRouterFactory interface {
// NewPriorityRouter creates router instance with URL
NewPriorityRouter([]byte, []byte, chan struct{}) (PriorityRouter, error)
NewPriorityRouter([]byte, []byte) (PriorityRouter, error)
}

// Router
Expand Down
31 changes: 0 additions & 31 deletions cluster/router/rule.go

This file was deleted.

4 changes: 2 additions & 2 deletions cluster/router/v3router/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@ func NewUniformRouterFactory() router.PriorityRouterFactory {
}

// NewPriorityRouter construct a new UniformRouteFactory as PriorityRouter
func (f *UniformRouteFactory) NewPriorityRouter(vsConfigBytes, distConfigBytes []byte, notify chan struct{}) (router.PriorityRouter, error) {
return NewUniformRouterChain(vsConfigBytes, distConfigBytes, notify)
func (f *UniformRouteFactory) NewPriorityRouter(vsConfigBytes, distConfigBytes []byte) (router.PriorityRouter, error) {
return NewUniformRouterChain(vsConfigBytes, distConfigBytes)
}
2 changes: 1 addition & 1 deletion cluster/router/v3router/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
func TestUniformRouterFacotry(t *testing.T) {
factory := NewUniformRouterFactory()
assert.NotNil(t, factory)
router, err := factory.NewPriorityRouter([]byte{}, []byte{}, make(chan struct{}))
router, err := factory.NewPriorityRouter([]byte{}, []byte{})
assert.Nil(t, err)
assert.NotNil(t, router)
}
13 changes: 6 additions & 7 deletions cluster/router/v3router/router_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ type RouterChain struct {
}

// NewUniformRouterChain return
func NewUniformRouterChain(virtualServiceConfig, destinationRuleConfig []byte, notify chan struct{}) (router.PriorityRouter, error) {
func NewUniformRouterChain(virtualServiceConfig, destinationRuleConfig []byte) (router.PriorityRouter, error) {
fromFileConfig := true
uniformRouters, err := parseFromConfigToRouters(virtualServiceConfig, destinationRuleConfig, notify)
uniformRouters, err := parseFromConfigToRouters(virtualServiceConfig, destinationRuleConfig)
if err != nil {
fromFileConfig = false
logger.Warnf("parse router config form local file failed, error = %+v", err)
Expand All @@ -59,7 +59,6 @@ func NewUniformRouterChain(virtualServiceConfig, destinationRuleConfig []byte, n
virtualServiceConfigBytes: virtualServiceConfig,
destinationRuleConfigBytes: destinationRuleConfig,
routers: uniformRouters,
notify: notify,
}
if err := k8s_api.SetK8sEventListener(r); err != nil {
logger.Warnf("try listen K8s router config failed, error = %+v", err)
Expand Down Expand Up @@ -110,7 +109,7 @@ func (r *RouterChain) Process(event *config_center.ConfigChangeEvent) {
logger.Error("Process change of virtual service: event.Value marshal error:", err)
return
}
r.routers, err = parseFromConfigToRouters(data, r.destinationRuleConfigBytes, r.notify)
r.routers, err = parseFromConfigToRouters(data, r.destinationRuleConfigBytes)
if err != nil {
logger.Error("Process change of virtual service: parseFromConfigToRouters:", err)
return
Expand Down Expand Up @@ -142,7 +141,7 @@ func (r *RouterChain) Process(event *config_center.ConfigChangeEvent) {
logger.Error("Process change of dest rule: event.Value marshal error:", err)
return
}
r.routers, err = parseFromConfigToRouters(r.virtualServiceConfigBytes, data, r.notify)
r.routers, err = parseFromConfigToRouters(r.virtualServiceConfigBytes, data)
if err != nil {
logger.Error("Process change of dest rule: parseFromConfigToRouters:", err)
return
Expand Down Expand Up @@ -174,7 +173,7 @@ func (r *RouterChain) URL() *common.URL {
}

// parseFromConfigToRouters parse virtualService and destinationRule yaml file bytes to target router list
func parseFromConfigToRouters(virtualServiceConfig, destinationRuleConfig []byte, notify chan struct{}) ([]*UniformRouter, error) {
func parseFromConfigToRouters(virtualServiceConfig, destinationRuleConfig []byte) ([]*UniformRouter, error) {
var virtualServiceConfigList []*config.VirtualServiceConfig
destRuleConfigsMap := make(map[string]map[string]map[string]string)

Expand Down Expand Up @@ -234,7 +233,7 @@ func parseFromConfigToRouters(virtualServiceConfig, destinationRuleConfig []byte
logger.Error("Parse config to uniform rule err = ", err)
return nil, err
}
rtr, err := NewUniformRouter(newRule, notify)
rtr, err := NewUniformRouter(newRule)
if err != nil {
logger.Error("new uniform router err = ", err)
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions cluster/router/v3router/router_chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const (
func TestNewUniformRouterChain(t *testing.T) {
vsBytes, _ := yaml.LoadYMLConfig(mockVSConfigPath)
drBytes, _ := yaml.LoadYMLConfig(mockDRConfigPath)
rc, err := NewUniformRouterChain(vsBytes, drBytes, make(chan struct{}))
rc, err := NewUniformRouterChain(vsBytes, drBytes)
assert.Nil(t, err)
assert.NotNil(t, rc)
}
Expand All @@ -72,7 +72,7 @@ type ruleTestItemStruct struct {
func TestParseConfigFromFile(t *testing.T) {
vsBytes, _ := yaml.LoadYMLConfig(mockVSConfigPath)
drBytes, _ := yaml.LoadYMLConfig(mockDRConfigPath)
routers, err := parseFromConfigToRouters(vsBytes, drBytes, make(chan struct{}, 1))
routers, err := parseFromConfigToRouters(vsBytes, drBytes)
fmt.Println(routers, err)
assert.Equal(t, len(routers), 1)
assert.NotNil(t, routers[0].dubboRouter)
Expand Down Expand Up @@ -199,7 +199,7 @@ func TestParseConfigFromFile(t *testing.T) {
func TestRouterChain_Route(t *testing.T) {
vsBytes, _ := yaml.LoadYMLConfig(mockVSConfigPath)
drBytes, _ := yaml.LoadYMLConfig(mockDRConfigPath)
rc, err := NewUniformRouterChain(vsBytes, drBytes, make(chan struct{}))
rc, err := NewUniformRouterChain(vsBytes, drBytes)
assert.Nil(t, err)
assert.NotNil(t, rc)
newGoodURL, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0")
Expand Down
4 changes: 1 addition & 3 deletions cluster/router/v3router/uniform_route.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,12 @@ const (
// UniformRouter have
type UniformRouter struct {
dubboRouter *DubboRouterRule
notify chan struct{}
}

// NewUniformRouter construct an NewConnCheckRouter via url
func NewUniformRouter(dubboRouter *DubboRouterRule, notify chan struct{}) (*UniformRouter, error) {
func NewUniformRouter(dubboRouter *DubboRouterRule) (*UniformRouter, error) {
r := &UniformRouter{
dubboRouter: dubboRouter,
notify: notify,
}
return r, nil
}
Expand Down

0 comments on commit 893bfb5

Please sign in to comment.