From 54a6c6c3cf3cb0cc0231a2de8f4e0aef883f3717 Mon Sep 17 00:00:00 2001 From: haozhicui Date: Fri, 17 Jan 2025 08:10:58 +0800 Subject: [PATCH] feature nearbyRouteRule v2 Signed-off-by: haozhicui --- pkg/flow/base_flow.go | 14 ++++++++++- pkg/flow/notify.go | 11 +++++---- pkg/model/core.go | 28 ++++++++++++---------- pkg/model/pb/routing.go | 13 ++++++++++ pkg/model/pb/rule.go | 9 +++---- pkg/model/pb/validate.go | 26 ++++++++++---------- pkg/plugin/localregistry/localregistry.go | 4 ++++ plugin/localregistry/inmemory/inmemory.go | 29 +++++++++++++++++++++++ plugin/servicerouter/nearbybase/nearby.go | 26 +++++++++++++++++++- 9 files changed, 125 insertions(+), 35 deletions(-) diff --git a/pkg/flow/base_flow.go b/pkg/flow/base_flow.go index e412f5dc..66ae1f16 100644 --- a/pkg/flow/base_flow.go +++ b/pkg/flow/base_flow.go @@ -21,7 +21,6 @@ import ( "fmt" "github.com/hashicorp/go-multierror" - "github.com/polarismesh/polaris-go/pkg/flow/data" "github.com/polarismesh/polaris-go/pkg/log" "github.com/polarismesh/polaris-go/pkg/model" @@ -163,6 +162,11 @@ func getAndLoadCacheValues(registry localregistry.LocalRegistry, request.SetDstRoute(routeRule) trigger.EnableDstRoute = false } + nearbyRouteRule := registry.GetServiceNearByRouteRule(dstService, false) + if nearbyRouteRule.IsInitialized() { + request.SetDstRoute(nearbyRouteRule) + trigger.EnableDstRoute = false + } if load && (routeRule.IsCacheLoaded() || !routeRule.IsInitialized()) { dstRouterKey := &ContextKey{ServiceKey: dstService, Operation: keyDstRoute} log.GetBaseLogger().Debugf("value not initialized, scheduled context %s", dstRouterKey) @@ -171,6 +175,14 @@ func getAndLoadCacheValues(registry localregistry.LocalRegistry, return nil, err.(model.SDKError) } notifiers = append(notifiers, NewSingleNotifyContext(dstRouterKey, notifier)) + + dstRouterKey = &ContextKey{ServiceKey: dstService, Operation: keyDstNearByRouteRule} + log.GetBaseLogger().Infof("value not initialized, scheduled context %s", dstRouterKey) + notifier, err = registry.LoadServiceNearByRouteRule(dstService) + if err != nil { + return nil, err.(model.SDKError) + } + notifiers = append(notifiers, NewSingleNotifyContext(dstRouterKey, notifier)) } } if trigger.EnableDstRateLimit { diff --git a/pkg/flow/notify.go b/pkg/flow/notify.go index 0c081109..f8cb6971 100644 --- a/pkg/flow/notify.go +++ b/pkg/flow/notify.go @@ -30,11 +30,12 @@ import ( ) const ( - keySourceRoute = "sourceRoute" - keyDstRoute = "destinationRoute" - keyDstRateLimit = "destinationRateLimit" - keyDstInstances = "destinationInstances" - keyDstServices = "destinationServices" + keySourceRoute = "sourceRoute" + keyDstRoute = "destinationRoute" + keyDstRateLimit = "destinationRateLimit" + keyDstInstances = "destinationInstances" + keyDstServices = "destinationServices" + keyDstNearByRouteRule = "destinationNearByRouteRule" ) // ContextKey 上下文标识 diff --git a/pkg/model/core.go b/pkg/model/core.go index f766fd37..1e2de28a 100644 --- a/pkg/model/core.go +++ b/pkg/model/core.go @@ -39,6 +39,8 @@ const ( EventCircuitBreaker EventType = 0x2006 // EventFaultDetect 探测规则 EventFaultDetect EventType = 0x2007 + //EventNearbyRouteRule 就近路由事件 + EventNearbyRouteRule EventType = 0x2008 ) // RegistryValue 存储于sdk缓存中的对象,包括服务实例和服务路由 @@ -66,21 +68,23 @@ func (e EventType) String() string { var ( // 路由规则到日志回显 eventTypeToPresent = map[EventType]string{ - EventInstances: "instance", - EventRouting: "routing", - EventRateLimiting: "rate_limiting", - EventServices: "services", - EventCircuitBreaker: "circuit_breaker", - EventFaultDetect: "fault_detect", + EventInstances: "instance", + EventRouting: "routing", + EventRateLimiting: "rate_limiting", + EventServices: "services", + EventCircuitBreaker: "circuit_breaker", + EventFaultDetect: "fault_detect", + EventNearbyRouteRule: "nearby_route_rule", } presentToEventType = map[string]EventType{ - "instance": EventInstances, - "routing": EventRouting, - "rate_limiting": EventRateLimiting, - "services": EventServices, - "circuit_breaker": EventCircuitBreaker, - "fault_detect": EventFaultDetect, + "instance": EventInstances, + "routing": EventRouting, + "rate_limiting": EventRateLimiting, + "services": EventServices, + "circuit_breaker": EventCircuitBreaker, + "fault_detect": EventFaultDetect, + "nearby_route_rule": EventNearbyRouteRule, } ) diff --git a/pkg/model/pb/routing.go b/pkg/model/pb/routing.go index 79f7c99b..73da3356 100644 --- a/pkg/model/pb/routing.go +++ b/pkg/model/pb/routing.go @@ -23,6 +23,7 @@ import ( apimodel "github.com/polarismesh/specification/source/go/api/v1/model" apiservice "github.com/polarismesh/specification/source/go/api/v1/service_manage" apitraffic "github.com/polarismesh/specification/source/go/api/v1/traffic_manage" + wrapperspb "google.golang.org/protobuf/types/known/wrapperspb" "github.com/polarismesh/polaris-go/pkg/model" ) @@ -43,6 +44,18 @@ func (r *RoutingAssistant) ParseRuleValue(resp *apiservice.DiscoverResponse) (pr if nil != routingValue { revision = routingValue.GetRevision().GetValue() } + + if resp.Routing == nil && len(resp.NearbyRouteRules) >= 1 { + rule := resp.NearbyRouteRules[0] + routing := &apitraffic.Routing{ + Namespace: wrapperspb.String(rule.GetNamespace()), + Service: resp.Service.Name, + Rules: resp.NearbyRouteRules, + } + + return routing, rule.GetRevision() + } + return routingValue, revision } diff --git a/pkg/model/pb/rule.go b/pkg/model/pb/rule.go index 610e47ec..8bdaf786 100644 --- a/pkg/model/pb/rule.go +++ b/pkg/model/pb/rule.go @@ -39,10 +39,11 @@ type ServiceRuleAssistant interface { } var eventTypeToAssistant = map[model.EventType]ServiceRuleAssistant{ - model.EventRouting: &RoutingAssistant{}, - model.EventRateLimiting: &RateLimitingAssistant{}, - model.EventCircuitBreaker: &CircuitBreakAssistant{}, - model.EventFaultDetect: &FaultDetectAssistant{}, + model.EventRouting: &RoutingAssistant{}, + model.EventRateLimiting: &RateLimitingAssistant{}, + model.EventCircuitBreaker: &CircuitBreakAssistant{}, + model.EventFaultDetect: &FaultDetectAssistant{}, + model.EventNearbyRouteRule: &RoutingAssistant{}, } // ServiceRuleInProto 路由规则配置对象. diff --git a/pkg/model/pb/validate.go b/pkg/model/pb/validate.go index 76897dd5..cae52692 100644 --- a/pkg/model/pb/validate.go +++ b/pkg/model/pb/validate.go @@ -29,21 +29,23 @@ import ( var ( eventTypeToProtoRequestType = map[model.EventType]apiservice.DiscoverRequest_DiscoverRequestType{ - model.EventInstances: apiservice.DiscoverRequest_INSTANCE, - model.EventRouting: apiservice.DiscoverRequest_ROUTING, - model.EventRateLimiting: apiservice.DiscoverRequest_RATE_LIMIT, - model.EventServices: apiservice.DiscoverRequest_SERVICES, - model.EventCircuitBreaker: apiservice.DiscoverRequest_CIRCUIT_BREAKER, - model.EventFaultDetect: apiservice.DiscoverRequest_FAULT_DETECTOR, + model.EventInstances: apiservice.DiscoverRequest_INSTANCE, + model.EventRouting: apiservice.DiscoverRequest_ROUTING, + model.EventRateLimiting: apiservice.DiscoverRequest_RATE_LIMIT, + model.EventServices: apiservice.DiscoverRequest_SERVICES, + model.EventCircuitBreaker: apiservice.DiscoverRequest_CIRCUIT_BREAKER, + model.EventFaultDetect: apiservice.DiscoverRequest_FAULT_DETECTOR, + model.EventNearbyRouteRule: apiservice.DiscoverRequest_NEARBY_ROUTE_RULE, } protoRespTypeToEventType = map[apiservice.DiscoverResponse_DiscoverResponseType]model.EventType{ - apiservice.DiscoverResponse_INSTANCE: model.EventInstances, - apiservice.DiscoverResponse_ROUTING: model.EventRouting, - apiservice.DiscoverResponse_RATE_LIMIT: model.EventRateLimiting, - apiservice.DiscoverResponse_SERVICES: model.EventServices, - apiservice.DiscoverResponse_CIRCUIT_BREAKER: model.EventCircuitBreaker, - apiservice.DiscoverResponse_FAULT_DETECTOR: model.EventFaultDetect, + apiservice.DiscoverResponse_INSTANCE: model.EventInstances, + apiservice.DiscoverResponse_ROUTING: model.EventRouting, + apiservice.DiscoverResponse_RATE_LIMIT: model.EventRateLimiting, + apiservice.DiscoverResponse_SERVICES: model.EventServices, + apiservice.DiscoverResponse_CIRCUIT_BREAKER: model.EventCircuitBreaker, + apiservice.DiscoverResponse_FAULT_DETECTOR: model.EventFaultDetect, + apiservice.DiscoverResponse_NEARBY_ROUTE_RULE: model.EventNearbyRouteRule, } ) diff --git a/pkg/plugin/localregistry/localregistry.go b/pkg/plugin/localregistry/localregistry.go index 840d06ae..884d7867 100644 --- a/pkg/plugin/localregistry/localregistry.go +++ b/pkg/plugin/localregistry/localregistry.go @@ -126,6 +126,10 @@ type RuleRegistry interface { GetServiceRouteRule(key *model.ServiceKey, includeCache bool) model.ServiceRule // LoadServiceRouteRule 非阻塞发起配置加载 LoadServiceRouteRule(key *model.ServiceKey) (*common.Notifier, error) + // GetServiceNearByRouteRule 非阻塞获取就近路由信息 + GetServiceNearByRouteRule(key *model.ServiceKey, includeCache bool) model.ServiceRule + // LoadServiceNearByRouteRule 非阻塞发起就近路由加载 + LoadServiceNearByRouteRule(key *model.ServiceKey) (*common.Notifier, error) // GetServiceRateLimitRule 非阻塞获取限流规则 GetServiceRateLimitRule(key *model.ServiceKey, includeCache bool) model.ServiceRule // LoadServiceRateLimitRule 非阻塞发起限流规则加载 diff --git a/plugin/localregistry/inmemory/inmemory.go b/plugin/localregistry/inmemory/inmemory.go index 13408141..9417020a 100644 --- a/plugin/localregistry/inmemory/inmemory.go +++ b/plugin/localregistry/inmemory/inmemory.go @@ -161,6 +161,7 @@ func (g *LocalCache) Init(ctx *plugin.InitContext) error { g.eventToCacheHandlers = make(map[model.EventType]CacheHandlers, 0) g.eventToCacheHandlers[model.EventInstances] = g.newServiceCacheHandler() g.eventToCacheHandlers[model.EventRouting] = g.newRuleCacheHandler() + g.eventToCacheHandlers[model.EventNearbyRouteRule] = g.newNearByRouteRuleCacheHandler() g.eventToCacheHandlers[model.EventRateLimiting] = g.newRateLimitCacheHandler() g.eventToCacheHandlers[model.EventCircuitBreaker] = g.newCircuitBreakerCacheHandler() g.eventToCacheHandlers[model.EventFaultDetect] = g.newFaultDetectCacheHandler() @@ -654,6 +655,14 @@ func (g *LocalCache) GetServiceRouteRule(key *model.ServiceKey, includeCache boo return svcRule } +// GetServiceNearByRouteRule 非阻塞获取就近路由信息 +func (g *LocalCache) GetServiceNearByRouteRule(key *model.ServiceKey, includeCache bool) model.ServiceRule { + svcEventKey := poolGetSvcEventKey(key, model.EventNearbyRouteRule) + svcRule := g.GetServiceRule(svcEventKey, includeCache) + poolPutSvcEventKey(svcEventKey) + return svcRule +} + // GetServicesByMeta 非阻塞获取服务列表 func (g *LocalCache) GetServicesByMeta(key *model.ServiceKey, includeCache bool) model.Services { svcEventKey := poolGetSvcEventKey(key, model.EventServices) @@ -725,6 +734,15 @@ func (g *LocalCache) newRuleCacheHandler() CacheHandlers { } } +// 创建就近路由规则缓存操作回调集合 +func (g *LocalCache) newNearByRouteRuleCacheHandler() CacheHandlers { + return CacheHandlers{ + CompareMessage: compareResource, + MessageToCacheValue: messageToServiceRule, + OnEventDeleted: g.deleteRule, + } +} + // 创建限流规则缓存操作回调集合 func (g *LocalCache) newRateLimitCacheHandler() CacheHandlers { return CacheHandlers{ @@ -1011,6 +1029,17 @@ func (g *LocalCache) LoadServices(key *model.ServiceKey) (*common.Notifier, erro }) } +// LoadServiceNearByRouteRule 非阻塞发起配置加载 +func (g *LocalCache) LoadServiceNearByRouteRule(key *model.ServiceKey) (*common.Notifier, error) { + return g.LoadServiceRule(&model.ServiceEventKey{ + ServiceKey: model.ServiceKey{ + Namespace: key.Namespace, + Service: key.Service, + }, + Type: model.EventNearbyRouteRule, + }) +} + // LoadServiceRateLimitRule 非阻塞发起限流规则加载 func (g *LocalCache) LoadServiceRateLimitRule(key *model.ServiceKey) (*common.Notifier, error) { return g.LoadServiceRule(&model.ServiceEventKey{ diff --git a/plugin/servicerouter/nearbybase/nearby.go b/plugin/servicerouter/nearbybase/nearby.go index 3289f8c4..488267b6 100644 --- a/plugin/servicerouter/nearbybase/nearby.go +++ b/plugin/servicerouter/nearbybase/nearby.go @@ -20,6 +20,8 @@ package nearbybase import ( "context" "fmt" + "github.com/modern-go/reflect2" + apitraffic "github.com/polarismesh/specification/source/go/api/v1/traffic_manage" "strings" "time" @@ -94,7 +96,29 @@ const ( // Enable 当前是否需要启动该服务路由插件 func (g *NearbyBasedInstancesFilter) Enable(routeInfo *servicerouter.RouteInfo, clusters model.ServiceClusters) bool { location := g.valueCtx.GetCurrentLocation().GetLocation() - return nil != location && clusters.IsNearbyEnabled() + return nil != location && (clusters.IsNearbyEnabled() || g.enableNearByRouteRules(routeInfo)) +} + +func ruleEmpty(svcRule model.ServiceRule) bool { + return reflect2.IsNil(svcRule) || reflect2.IsNil(svcRule.GetValue()) || svcRule.GetValidateError() != nil +} + +func (g *NearbyBasedInstancesFilter) enableNearByRouteRules(routeInfo *servicerouter.RouteInfo) bool { + if ruleEmpty(routeInfo.DestRouteRule) { + return false + } + + rt, ok := routeInfo.DestRouteRule.GetValue().(*apitraffic.Routing) + if !ok { + return false + } + for _, rule := range rt.Rules { + if rule.Enable { + return true + } + } + + return false } // 一个匹配级别的cluster的健康和全部实例数量