Skip to content

Commit

Permalink
feature nearbyRouteRule v2
Browse files Browse the repository at this point in the history
Signed-off-by: haozhicui <[email protected]>
  • Loading branch information
haozhicui committed Jan 17, 2025
1 parent 9203e8b commit 54a6c6c
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 35 deletions.
14 changes: 13 additions & 1 deletion pkg/flow/base_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"

"github.com/hashicorp/go-multierror"

Check failure on line 23 in pkg/flow/base_flow.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.16.x)

File is not properly formatted (gci)

Check failure on line 23 in pkg/flow/base_flow.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.19.x)

File is not properly formatted (gci)

Check failure on line 23 in pkg/flow/base_flow.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.17.x)

File is not properly formatted (gci)

"github.com/polarismesh/polaris-go/pkg/flow/data"
"github.com/polarismesh/polaris-go/pkg/log"
"github.com/polarismesh/polaris-go/pkg/model"
Expand Down Expand Up @@ -163,6 +162,11 @@ func getAndLoadCacheValues(registry localregistry.LocalRegistry,
request.SetDstRoute(routeRule)
trigger.EnableDstRoute = false
}
nearbyRouteRule := registry.GetServiceNearByRouteRule(dstService, false)
if nearbyRouteRule.IsInitialized() {
request.SetDstRoute(nearbyRouteRule)
trigger.EnableDstRoute = false
}
if load && (routeRule.IsCacheLoaded() || !routeRule.IsInitialized()) {
dstRouterKey := &ContextKey{ServiceKey: dstService, Operation: keyDstRoute}
log.GetBaseLogger().Debugf("value not initialized, scheduled context %s", dstRouterKey)
Expand All @@ -171,6 +175,14 @@ func getAndLoadCacheValues(registry localregistry.LocalRegistry,
return nil, err.(model.SDKError)
}
notifiers = append(notifiers, NewSingleNotifyContext(dstRouterKey, notifier))

dstRouterKey = &ContextKey{ServiceKey: dstService, Operation: keyDstNearByRouteRule}
log.GetBaseLogger().Infof("value not initialized, scheduled context %s", dstRouterKey)
notifier, err = registry.LoadServiceNearByRouteRule(dstService)
if err != nil {
return nil, err.(model.SDKError)
}
notifiers = append(notifiers, NewSingleNotifyContext(dstRouterKey, notifier))
}
}
if trigger.EnableDstRateLimit {
Expand Down
11 changes: 6 additions & 5 deletions pkg/flow/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ import (
)

const (
keySourceRoute = "sourceRoute"
keyDstRoute = "destinationRoute"
keyDstRateLimit = "destinationRateLimit"
keyDstInstances = "destinationInstances"
keyDstServices = "destinationServices"
keySourceRoute = "sourceRoute"
keyDstRoute = "destinationRoute"
keyDstRateLimit = "destinationRateLimit"
keyDstInstances = "destinationInstances"
keyDstServices = "destinationServices"
keyDstNearByRouteRule = "destinationNearByRouteRule"
)

// ContextKey 上下文标识
Expand Down
28 changes: 16 additions & 12 deletions pkg/model/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ const (
EventCircuitBreaker EventType = 0x2006
// EventFaultDetect 探测规则
EventFaultDetect EventType = 0x2007
//EventNearbyRouteRule 就近路由事件

Check failure on line 42 in pkg/model/core.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.16.x)

commentFormatting: put a space between `//` and comment text (gocritic)

Check failure on line 42 in pkg/model/core.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.19.x)

commentFormatting: put a space between `//` and comment text (gocritic)

Check failure on line 42 in pkg/model/core.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.17.x)

commentFormatting: put a space between `//` and comment text (gocritic)
EventNearbyRouteRule EventType = 0x2008
)

// RegistryValue 存储于sdk缓存中的对象,包括服务实例和服务路由
Expand Down Expand Up @@ -66,21 +68,23 @@ func (e EventType) String() string {
var (
// 路由规则到日志回显
eventTypeToPresent = map[EventType]string{
EventInstances: "instance",
EventRouting: "routing",
EventRateLimiting: "rate_limiting",
EventServices: "services",
EventCircuitBreaker: "circuit_breaker",
EventFaultDetect: "fault_detect",
EventInstances: "instance",
EventRouting: "routing",
EventRateLimiting: "rate_limiting",
EventServices: "services",
EventCircuitBreaker: "circuit_breaker",
EventFaultDetect: "fault_detect",
EventNearbyRouteRule: "nearby_route_rule",
}

presentToEventType = map[string]EventType{
"instance": EventInstances,
"routing": EventRouting,
"rate_limiting": EventRateLimiting,
"services": EventServices,
"circuit_breaker": EventCircuitBreaker,
"fault_detect": EventFaultDetect,
"instance": EventInstances,
"routing": EventRouting,
"rate_limiting": EventRateLimiting,
"services": EventServices,
"circuit_breaker": EventCircuitBreaker,
"fault_detect": EventFaultDetect,
"nearby_route_rule": EventNearbyRouteRule,
}
)

Expand Down
13 changes: 13 additions & 0 deletions pkg/model/pb/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
apimodel "github.com/polarismesh/specification/source/go/api/v1/model"
apiservice "github.com/polarismesh/specification/source/go/api/v1/service_manage"
apitraffic "github.com/polarismesh/specification/source/go/api/v1/traffic_manage"
wrapperspb "google.golang.org/protobuf/types/known/wrapperspb"

"github.com/polarismesh/polaris-go/pkg/model"
)
Expand All @@ -43,6 +44,18 @@ func (r *RoutingAssistant) ParseRuleValue(resp *apiservice.DiscoverResponse) (pr
if nil != routingValue {
revision = routingValue.GetRevision().GetValue()
}

if resp.Routing == nil && len(resp.NearbyRouteRules) >= 1 {
rule := resp.NearbyRouteRules[0]
routing := &apitraffic.Routing{
Namespace: wrapperspb.String(rule.GetNamespace()),
Service: resp.Service.Name,
Rules: resp.NearbyRouteRules,
}

return routing, rule.GetRevision()
}

return routingValue, revision
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/model/pb/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ type ServiceRuleAssistant interface {
}

var eventTypeToAssistant = map[model.EventType]ServiceRuleAssistant{
model.EventRouting: &RoutingAssistant{},
model.EventRateLimiting: &RateLimitingAssistant{},
model.EventCircuitBreaker: &CircuitBreakAssistant{},
model.EventFaultDetect: &FaultDetectAssistant{},
model.EventRouting: &RoutingAssistant{},
model.EventRateLimiting: &RateLimitingAssistant{},
model.EventCircuitBreaker: &CircuitBreakAssistant{},
model.EventFaultDetect: &FaultDetectAssistant{},
model.EventNearbyRouteRule: &RoutingAssistant{},
}

// ServiceRuleInProto 路由规则配置对象.
Expand Down
26 changes: 14 additions & 12 deletions pkg/model/pb/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,23 @@ import (

var (
eventTypeToProtoRequestType = map[model.EventType]apiservice.DiscoverRequest_DiscoverRequestType{
model.EventInstances: apiservice.DiscoverRequest_INSTANCE,
model.EventRouting: apiservice.DiscoverRequest_ROUTING,
model.EventRateLimiting: apiservice.DiscoverRequest_RATE_LIMIT,
model.EventServices: apiservice.DiscoverRequest_SERVICES,
model.EventCircuitBreaker: apiservice.DiscoverRequest_CIRCUIT_BREAKER,
model.EventFaultDetect: apiservice.DiscoverRequest_FAULT_DETECTOR,
model.EventInstances: apiservice.DiscoverRequest_INSTANCE,
model.EventRouting: apiservice.DiscoverRequest_ROUTING,
model.EventRateLimiting: apiservice.DiscoverRequest_RATE_LIMIT,
model.EventServices: apiservice.DiscoverRequest_SERVICES,
model.EventCircuitBreaker: apiservice.DiscoverRequest_CIRCUIT_BREAKER,
model.EventFaultDetect: apiservice.DiscoverRequest_FAULT_DETECTOR,
model.EventNearbyRouteRule: apiservice.DiscoverRequest_NEARBY_ROUTE_RULE,
}

protoRespTypeToEventType = map[apiservice.DiscoverResponse_DiscoverResponseType]model.EventType{
apiservice.DiscoverResponse_INSTANCE: model.EventInstances,
apiservice.DiscoverResponse_ROUTING: model.EventRouting,
apiservice.DiscoverResponse_RATE_LIMIT: model.EventRateLimiting,
apiservice.DiscoverResponse_SERVICES: model.EventServices,
apiservice.DiscoverResponse_CIRCUIT_BREAKER: model.EventCircuitBreaker,
apiservice.DiscoverResponse_FAULT_DETECTOR: model.EventFaultDetect,
apiservice.DiscoverResponse_INSTANCE: model.EventInstances,
apiservice.DiscoverResponse_ROUTING: model.EventRouting,
apiservice.DiscoverResponse_RATE_LIMIT: model.EventRateLimiting,
apiservice.DiscoverResponse_SERVICES: model.EventServices,
apiservice.DiscoverResponse_CIRCUIT_BREAKER: model.EventCircuitBreaker,
apiservice.DiscoverResponse_FAULT_DETECTOR: model.EventFaultDetect,
apiservice.DiscoverResponse_NEARBY_ROUTE_RULE: model.EventNearbyRouteRule,
}
)

Expand Down
4 changes: 4 additions & 0 deletions pkg/plugin/localregistry/localregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ type RuleRegistry interface {
GetServiceRouteRule(key *model.ServiceKey, includeCache bool) model.ServiceRule
// LoadServiceRouteRule 非阻塞发起配置加载
LoadServiceRouteRule(key *model.ServiceKey) (*common.Notifier, error)
// GetServiceNearByRouteRule 非阻塞获取就近路由信息
GetServiceNearByRouteRule(key *model.ServiceKey, includeCache bool) model.ServiceRule
// LoadServiceNearByRouteRule 非阻塞发起就近路由加载
LoadServiceNearByRouteRule(key *model.ServiceKey) (*common.Notifier, error)
// GetServiceRateLimitRule 非阻塞获取限流规则
GetServiceRateLimitRule(key *model.ServiceKey, includeCache bool) model.ServiceRule
// LoadServiceRateLimitRule 非阻塞发起限流规则加载
Expand Down
29 changes: 29 additions & 0 deletions plugin/localregistry/inmemory/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func (g *LocalCache) Init(ctx *plugin.InitContext) error {
g.eventToCacheHandlers = make(map[model.EventType]CacheHandlers, 0)
g.eventToCacheHandlers[model.EventInstances] = g.newServiceCacheHandler()
g.eventToCacheHandlers[model.EventRouting] = g.newRuleCacheHandler()
g.eventToCacheHandlers[model.EventNearbyRouteRule] = g.newNearByRouteRuleCacheHandler()
g.eventToCacheHandlers[model.EventRateLimiting] = g.newRateLimitCacheHandler()
g.eventToCacheHandlers[model.EventCircuitBreaker] = g.newCircuitBreakerCacheHandler()
g.eventToCacheHandlers[model.EventFaultDetect] = g.newFaultDetectCacheHandler()
Expand Down Expand Up @@ -654,6 +655,14 @@ func (g *LocalCache) GetServiceRouteRule(key *model.ServiceKey, includeCache boo
return svcRule
}

// GetServiceNearByRouteRule 非阻塞获取就近路由信息
func (g *LocalCache) GetServiceNearByRouteRule(key *model.ServiceKey, includeCache bool) model.ServiceRule {
svcEventKey := poolGetSvcEventKey(key, model.EventNearbyRouteRule)
svcRule := g.GetServiceRule(svcEventKey, includeCache)
poolPutSvcEventKey(svcEventKey)
return svcRule
}

// GetServicesByMeta 非阻塞获取服务列表
func (g *LocalCache) GetServicesByMeta(key *model.ServiceKey, includeCache bool) model.Services {
svcEventKey := poolGetSvcEventKey(key, model.EventServices)
Expand Down Expand Up @@ -725,6 +734,15 @@ func (g *LocalCache) newRuleCacheHandler() CacheHandlers {
}
}

// 创建就近路由规则缓存操作回调集合
func (g *LocalCache) newNearByRouteRuleCacheHandler() CacheHandlers {
return CacheHandlers{
CompareMessage: compareResource,
MessageToCacheValue: messageToServiceRule,
OnEventDeleted: g.deleteRule,
}
}

// 创建限流规则缓存操作回调集合
func (g *LocalCache) newRateLimitCacheHandler() CacheHandlers {
return CacheHandlers{
Expand Down Expand Up @@ -1011,6 +1029,17 @@ func (g *LocalCache) LoadServices(key *model.ServiceKey) (*common.Notifier, erro
})
}

// LoadServiceNearByRouteRule 非阻塞发起配置加载
func (g *LocalCache) LoadServiceNearByRouteRule(key *model.ServiceKey) (*common.Notifier, error) {
return g.LoadServiceRule(&model.ServiceEventKey{
ServiceKey: model.ServiceKey{
Namespace: key.Namespace,
Service: key.Service,
},
Type: model.EventNearbyRouteRule,
})
}

// LoadServiceRateLimitRule 非阻塞发起限流规则加载
func (g *LocalCache) LoadServiceRateLimitRule(key *model.ServiceKey) (*common.Notifier, error) {
return g.LoadServiceRule(&model.ServiceEventKey{
Expand Down
26 changes: 25 additions & 1 deletion plugin/servicerouter/nearbybase/nearby.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package nearbybase
import (
"context"
"fmt"
"github.com/modern-go/reflect2"
apitraffic "github.com/polarismesh/specification/source/go/api/v1/traffic_manage"
"strings"
"time"

Expand Down Expand Up @@ -94,7 +96,29 @@ const (
// Enable 当前是否需要启动该服务路由插件
func (g *NearbyBasedInstancesFilter) Enable(routeInfo *servicerouter.RouteInfo, clusters model.ServiceClusters) bool {
location := g.valueCtx.GetCurrentLocation().GetLocation()
return nil != location && clusters.IsNearbyEnabled()
return nil != location && (clusters.IsNearbyEnabled() || g.enableNearByRouteRules(routeInfo))
}

func ruleEmpty(svcRule model.ServiceRule) bool {
return reflect2.IsNil(svcRule) || reflect2.IsNil(svcRule.GetValue()) || svcRule.GetValidateError() != nil
}

func (g *NearbyBasedInstancesFilter) enableNearByRouteRules(routeInfo *servicerouter.RouteInfo) bool {
if ruleEmpty(routeInfo.DestRouteRule) {
return false
}

rt, ok := routeInfo.DestRouteRule.GetValue().(*apitraffic.Routing)
if !ok {
return false
}
for _, rule := range rt.Rules {
if rule.Enable {
return true
}
}

return false
}

// 一个匹配级别的cluster的健康和全部实例数量
Expand Down

0 comments on commit 54a6c6c

Please sign in to comment.