Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature nearbyRouteRule v2 #230

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion pkg/flow/base_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
import (
"fmt"

"github.com/hashicorp/go-multierror"

Check failure on line 23 in pkg/flow/base_flow.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.16.x)

File is not properly formatted (gci)

Check failure on line 23 in pkg/flow/base_flow.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.19.x)

File is not properly formatted (gci)

Check failure on line 23 in pkg/flow/base_flow.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.17.x)

File is not properly formatted (gci)

"github.com/polarismesh/polaris-go/pkg/flow/data"
"github.com/polarismesh/polaris-go/pkg/log"
"github.com/polarismesh/polaris-go/pkg/model"
Expand Down Expand Up @@ -163,6 +162,11 @@
request.SetDstRoute(routeRule)
trigger.EnableDstRoute = false
}
nearbyRouteRule := registry.GetServiceNearByRouteRule(dstService, false)
if nearbyRouteRule.IsInitialized() {
request.SetDstRoute(nearbyRouteRule)
trigger.EnableDstRoute = false
}
if load && (routeRule.IsCacheLoaded() || !routeRule.IsInitialized()) {
dstRouterKey := &ContextKey{ServiceKey: dstService, Operation: keyDstRoute}
log.GetBaseLogger().Debugf("value not initialized, scheduled context %s", dstRouterKey)
Expand All @@ -171,6 +175,14 @@
return nil, err.(model.SDKError)
}
notifiers = append(notifiers, NewSingleNotifyContext(dstRouterKey, notifier))

dstRouterKey = &ContextKey{ServiceKey: dstService, Operation: keyDstNearByRouteRule}
log.GetBaseLogger().Infof("value not initialized, scheduled context %s", dstRouterKey)
notifier, err = registry.LoadServiceNearByRouteRule(dstService)
if err != nil {
return nil, err.(model.SDKError)
}
notifiers = append(notifiers, NewSingleNotifyContext(dstRouterKey, notifier))
}
}
if trigger.EnableDstRateLimit {
Expand Down
11 changes: 6 additions & 5 deletions pkg/flow/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ import (
)

const (
keySourceRoute = "sourceRoute"
keyDstRoute = "destinationRoute"
keyDstRateLimit = "destinationRateLimit"
keyDstInstances = "destinationInstances"
keyDstServices = "destinationServices"
keySourceRoute = "sourceRoute"
keyDstRoute = "destinationRoute"
keyDstRateLimit = "destinationRateLimit"
keyDstInstances = "destinationInstances"
keyDstServices = "destinationServices"
keyDstNearByRouteRule = "destinationNearByRouteRule"
)

// ContextKey 上下文标识
Expand Down
28 changes: 16 additions & 12 deletions pkg/model/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
EventCircuitBreaker EventType = 0x2006
// EventFaultDetect 探测规则
EventFaultDetect EventType = 0x2007
//EventNearbyRouteRule 就近路由事件

Check failure on line 42 in pkg/model/core.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.16.x)

commentFormatting: put a space between `//` and comment text (gocritic)

Check failure on line 42 in pkg/model/core.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.19.x)

commentFormatting: put a space between `//` and comment text (gocritic)

Check failure on line 42 in pkg/model/core.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.17.x)

commentFormatting: put a space between `//` and comment text (gocritic)
EventNearbyRouteRule EventType = 0x2008
)

// RegistryValue 存储于sdk缓存中的对象,包括服务实例和服务路由
Expand Down Expand Up @@ -66,21 +68,23 @@
var (
// 路由规则到日志回显
eventTypeToPresent = map[EventType]string{
EventInstances: "instance",
EventRouting: "routing",
EventRateLimiting: "rate_limiting",
EventServices: "services",
EventCircuitBreaker: "circuit_breaker",
EventFaultDetect: "fault_detect",
EventInstances: "instance",
EventRouting: "routing",
EventRateLimiting: "rate_limiting",
EventServices: "services",
EventCircuitBreaker: "circuit_breaker",
EventFaultDetect: "fault_detect",
EventNearbyRouteRule: "nearby_route_rule",
}

presentToEventType = map[string]EventType{
"instance": EventInstances,
"routing": EventRouting,
"rate_limiting": EventRateLimiting,
"services": EventServices,
"circuit_breaker": EventCircuitBreaker,
"fault_detect": EventFaultDetect,
"instance": EventInstances,
"routing": EventRouting,
"rate_limiting": EventRateLimiting,
"services": EventServices,
"circuit_breaker": EventCircuitBreaker,
"fault_detect": EventFaultDetect,
"nearby_route_rule": EventNearbyRouteRule,
}
)

Expand Down
13 changes: 13 additions & 0 deletions pkg/model/pb/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
apimodel "github.com/polarismesh/specification/source/go/api/v1/model"
apiservice "github.com/polarismesh/specification/source/go/api/v1/service_manage"
apitraffic "github.com/polarismesh/specification/source/go/api/v1/traffic_manage"
wrapperspb "google.golang.org/protobuf/types/known/wrapperspb"

"github.com/polarismesh/polaris-go/pkg/model"
)
Expand All @@ -43,6 +44,18 @@ func (r *RoutingAssistant) ParseRuleValue(resp *apiservice.DiscoverResponse) (pr
if nil != routingValue {
revision = routingValue.GetRevision().GetValue()
}

if resp.Routing == nil && len(resp.NearbyRouteRules) >= 1 {
rule := resp.NearbyRouteRules[0]
routing := &apitraffic.Routing{
Namespace: wrapperspb.String(rule.GetNamespace()),
Service: resp.Service.Name,
Rules: resp.NearbyRouteRules,
}

return routing, rule.GetRevision()
}

return routingValue, revision
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/model/pb/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ type ServiceRuleAssistant interface {
}

var eventTypeToAssistant = map[model.EventType]ServiceRuleAssistant{
model.EventRouting: &RoutingAssistant{},
model.EventRateLimiting: &RateLimitingAssistant{},
model.EventCircuitBreaker: &CircuitBreakAssistant{},
model.EventFaultDetect: &FaultDetectAssistant{},
model.EventRouting: &RoutingAssistant{},
model.EventRateLimiting: &RateLimitingAssistant{},
model.EventCircuitBreaker: &CircuitBreakAssistant{},
model.EventFaultDetect: &FaultDetectAssistant{},
model.EventNearbyRouteRule: &RoutingAssistant{},
}

// ServiceRuleInProto 路由规则配置对象.
Expand Down
26 changes: 14 additions & 12 deletions pkg/model/pb/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,23 @@ import (

var (
eventTypeToProtoRequestType = map[model.EventType]apiservice.DiscoverRequest_DiscoverRequestType{
model.EventInstances: apiservice.DiscoverRequest_INSTANCE,
model.EventRouting: apiservice.DiscoverRequest_ROUTING,
model.EventRateLimiting: apiservice.DiscoverRequest_RATE_LIMIT,
model.EventServices: apiservice.DiscoverRequest_SERVICES,
model.EventCircuitBreaker: apiservice.DiscoverRequest_CIRCUIT_BREAKER,
model.EventFaultDetect: apiservice.DiscoverRequest_FAULT_DETECTOR,
model.EventInstances: apiservice.DiscoverRequest_INSTANCE,
model.EventRouting: apiservice.DiscoverRequest_ROUTING,
model.EventRateLimiting: apiservice.DiscoverRequest_RATE_LIMIT,
model.EventServices: apiservice.DiscoverRequest_SERVICES,
model.EventCircuitBreaker: apiservice.DiscoverRequest_CIRCUIT_BREAKER,
model.EventFaultDetect: apiservice.DiscoverRequest_FAULT_DETECTOR,
model.EventNearbyRouteRule: apiservice.DiscoverRequest_NEARBY_ROUTE_RULE,
}

protoRespTypeToEventType = map[apiservice.DiscoverResponse_DiscoverResponseType]model.EventType{
apiservice.DiscoverResponse_INSTANCE: model.EventInstances,
apiservice.DiscoverResponse_ROUTING: model.EventRouting,
apiservice.DiscoverResponse_RATE_LIMIT: model.EventRateLimiting,
apiservice.DiscoverResponse_SERVICES: model.EventServices,
apiservice.DiscoverResponse_CIRCUIT_BREAKER: model.EventCircuitBreaker,
apiservice.DiscoverResponse_FAULT_DETECTOR: model.EventFaultDetect,
apiservice.DiscoverResponse_INSTANCE: model.EventInstances,
apiservice.DiscoverResponse_ROUTING: model.EventRouting,
apiservice.DiscoverResponse_RATE_LIMIT: model.EventRateLimiting,
apiservice.DiscoverResponse_SERVICES: model.EventServices,
apiservice.DiscoverResponse_CIRCUIT_BREAKER: model.EventCircuitBreaker,
apiservice.DiscoverResponse_FAULT_DETECTOR: model.EventFaultDetect,
apiservice.DiscoverResponse_NEARBY_ROUTE_RULE: model.EventNearbyRouteRule,
}
)

Expand Down
4 changes: 4 additions & 0 deletions pkg/plugin/localregistry/localregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ type RuleRegistry interface {
GetServiceRouteRule(key *model.ServiceKey, includeCache bool) model.ServiceRule
// LoadServiceRouteRule 非阻塞发起配置加载
LoadServiceRouteRule(key *model.ServiceKey) (*common.Notifier, error)
// GetServiceNearByRouteRule 非阻塞获取就近路由信息
GetServiceNearByRouteRule(key *model.ServiceKey, includeCache bool) model.ServiceRule
// LoadServiceNearByRouteRule 非阻塞发起就近路由加载
LoadServiceNearByRouteRule(key *model.ServiceKey) (*common.Notifier, error)
// GetServiceRateLimitRule 非阻塞获取限流规则
GetServiceRateLimitRule(key *model.ServiceKey, includeCache bool) model.ServiceRule
// LoadServiceRateLimitRule 非阻塞发起限流规则加载
Expand Down
29 changes: 29 additions & 0 deletions plugin/localregistry/inmemory/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func (g *LocalCache) Init(ctx *plugin.InitContext) error {
g.eventToCacheHandlers = make(map[model.EventType]CacheHandlers, 0)
g.eventToCacheHandlers[model.EventInstances] = g.newServiceCacheHandler()
g.eventToCacheHandlers[model.EventRouting] = g.newRuleCacheHandler()
g.eventToCacheHandlers[model.EventNearbyRouteRule] = g.newNearByRouteRuleCacheHandler()
g.eventToCacheHandlers[model.EventRateLimiting] = g.newRateLimitCacheHandler()
g.eventToCacheHandlers[model.EventCircuitBreaker] = g.newCircuitBreakerCacheHandler()
g.eventToCacheHandlers[model.EventFaultDetect] = g.newFaultDetectCacheHandler()
Expand Down Expand Up @@ -654,6 +655,14 @@ func (g *LocalCache) GetServiceRouteRule(key *model.ServiceKey, includeCache boo
return svcRule
}

// GetServiceNearByRouteRule 非阻塞获取就近路由信息
func (g *LocalCache) GetServiceNearByRouteRule(key *model.ServiceKey, includeCache bool) model.ServiceRule {
svcEventKey := poolGetSvcEventKey(key, model.EventNearbyRouteRule)
svcRule := g.GetServiceRule(svcEventKey, includeCache)
poolPutSvcEventKey(svcEventKey)
return svcRule
}

// GetServicesByMeta 非阻塞获取服务列表
func (g *LocalCache) GetServicesByMeta(key *model.ServiceKey, includeCache bool) model.Services {
svcEventKey := poolGetSvcEventKey(key, model.EventServices)
Expand Down Expand Up @@ -725,6 +734,15 @@ func (g *LocalCache) newRuleCacheHandler() CacheHandlers {
}
}

// 创建就近路由规则缓存操作回调集合
func (g *LocalCache) newNearByRouteRuleCacheHandler() CacheHandlers {
return CacheHandlers{
CompareMessage: compareResource,
MessageToCacheValue: messageToServiceRule,
OnEventDeleted: g.deleteRule,
}
}

// 创建限流规则缓存操作回调集合
func (g *LocalCache) newRateLimitCacheHandler() CacheHandlers {
return CacheHandlers{
Expand Down Expand Up @@ -1011,6 +1029,17 @@ func (g *LocalCache) LoadServices(key *model.ServiceKey) (*common.Notifier, erro
})
}

// LoadServiceNearByRouteRule 非阻塞发起配置加载
func (g *LocalCache) LoadServiceNearByRouteRule(key *model.ServiceKey) (*common.Notifier, error) {
return g.LoadServiceRule(&model.ServiceEventKey{
ServiceKey: model.ServiceKey{
Namespace: key.Namespace,
Service: key.Service,
},
Type: model.EventNearbyRouteRule,
})
}

// LoadServiceRateLimitRule 非阻塞发起限流规则加载
func (g *LocalCache) LoadServiceRateLimitRule(key *model.ServiceKey) (*common.Notifier, error) {
return g.LoadServiceRule(&model.ServiceEventKey{
Expand Down
26 changes: 25 additions & 1 deletion plugin/servicerouter/nearbybase/nearby.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package nearbybase
import (
"context"
"fmt"
"github.com/modern-go/reflect2"
apitraffic "github.com/polarismesh/specification/source/go/api/v1/traffic_manage"
"strings"
"time"

Expand Down Expand Up @@ -94,7 +96,29 @@ const (
// Enable 当前是否需要启动该服务路由插件
func (g *NearbyBasedInstancesFilter) Enable(routeInfo *servicerouter.RouteInfo, clusters model.ServiceClusters) bool {
location := g.valueCtx.GetCurrentLocation().GetLocation()
return nil != location && clusters.IsNearbyEnabled()
return nil != location && (clusters.IsNearbyEnabled() || g.enableNearByRouteRules(routeInfo))
}

func ruleEmpty(svcRule model.ServiceRule) bool {
return reflect2.IsNil(svcRule) || reflect2.IsNil(svcRule.GetValue()) || svcRule.GetValidateError() != nil
}

func (g *NearbyBasedInstancesFilter) enableNearByRouteRules(routeInfo *servicerouter.RouteInfo) bool {
if ruleEmpty(routeInfo.DestRouteRule) {
return false
}

rt, ok := routeInfo.DestRouteRule.GetValue().(*apitraffic.Routing)
if !ok {
return false
}
for _, rule := range rt.Rules {
if rule.Enable {
return true
}
}

return false
}

// 一个匹配级别的cluster的健康和全部实例数量
Expand Down
Loading