Skip to content

Commit

Permalink
Select alive only node when fallbackTag is given
Browse files Browse the repository at this point in the history
- Apply to random and roundrobin strategy
- Require observatory config

Co-authored-by: Mark Ma <[email protected]>
  • Loading branch information
yuhan6665 and mkmark committed May 5, 2024
1 parent eba2906 commit 84eeb56
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 5 deletions.
49 changes: 48 additions & 1 deletion app/router/balancing.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"context"
sync "sync"

"github.com/xtls/xray-core/app/observatory"
"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/core"
"github.com/xtls/xray-core/features/extension"
"github.com/xtls/xray-core/features/outbound"
)
Expand All @@ -17,14 +20,58 @@ type BalancingPrincipleTarget interface {
}

type RoundRobinStrategy struct {
FallbackTag string

ctx context.Context
observatory extension.Observatory
mu sync.Mutex
index int
}

func (s *RoundRobinStrategy) InjectContext(ctx context.Context) {
s.ctx = ctx
}

func (s *RoundRobinStrategy) GetPrincipleTarget(strings []string) []string {
return strings
}

func (s *RoundRobinStrategy) PickOutbound(tags []string) string {
if len(s.FallbackTag) > 0 && s.observatory == nil {
common.Must(core.RequireFeatures(s.ctx, func(observatory extension.Observatory) error {
s.observatory = observatory
return nil
}))
}
if s.observatory != nil {
observeReport, err := s.observatory.GetObservation(s.ctx)
if err == nil {
aliveTags := make([]string, 0)
if result, ok := observeReport.(*observatory.ObservationResult); ok {
status := result.Status
statusMap := make(map[string]*observatory.OutboundStatus)
for _, outboundStatus := range status {
statusMap[outboundStatus.OutboundTag] = outboundStatus
}
for _, candidate := range tags {
if outboundStatus, found := statusMap[candidate]; found {
if outboundStatus.Alive {
aliveTags = append(aliveTags, candidate)
}
} else {
// unfound candidate is considered alive
aliveTags = append(aliveTags, candidate)
}
}
tags = aliveTags
}
}
}

n := len(tags)
if n == 0 {
panic("0 tags")
// goes to fallbackTag
return ""
}

s.mu.Lock()
Expand Down
4 changes: 2 additions & 2 deletions app/router/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (br *BalancingRule) Build(ohm outbound.Manager, dispatcher routing.Dispatch
case "roundrobin":
return &Balancer{
selectors: br.OutboundSelector,
strategy: &RoundRobinStrategy{},
strategy: &RoundRobinStrategy{FallbackTag: br.FallbackTag},
fallbackTag: br.FallbackTag,
ohm: ohm,
}, nil
Expand All @@ -162,7 +162,7 @@ func (br *BalancingRule) Build(ohm outbound.Manager, dispatcher routing.Dispatch
selectors: br.OutboundSelector,
ohm: ohm,
fallbackTag: br.FallbackTag,
strategy: &RandomStrategy{},
strategy: &RandomStrategy{FallbackTag: br.FallbackTag},
}, nil
default:
return nil, newError("unrecognized balancer type")
Expand Down
48 changes: 47 additions & 1 deletion app/router/strategy_random.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,63 @@
package router

import (
"context"

"github.com/xtls/xray-core/app/observatory"
"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/dice"
"github.com/xtls/xray-core/core"
"github.com/xtls/xray-core/features/extension"
)

// RandomStrategy represents a random balancing strategy
type RandomStrategy struct{}
type RandomStrategy struct{
FallbackTag string

ctx context.Context
observatory extension.Observatory
}

func (s *RandomStrategy) InjectContext(ctx context.Context) {
s.ctx = ctx
}

func (s *RandomStrategy) GetPrincipleTarget(strings []string) []string {
return strings
}

func (s *RandomStrategy) PickOutbound(candidates []string) string {
if len(s.FallbackTag) > 0 && s.observatory == nil {
common.Must(core.RequireFeatures(s.ctx, func(observatory extension.Observatory) error {
s.observatory = observatory
return nil
}))
}
if s.observatory != nil {
observeReport, err := s.observatory.GetObservation(s.ctx)
if err == nil {
aliveTags := make([]string, 0)
if result, ok := observeReport.(*observatory.ObservationResult); ok {
status := result.Status
statusMap := make(map[string]*observatory.OutboundStatus)
for _, outboundStatus := range status {
statusMap[outboundStatus.OutboundTag] = outboundStatus
}
for _, candidate := range candidates {
if outboundStatus, found := statusMap[candidate]; found {
if outboundStatus.Alive {
aliveTags = append(aliveTags, candidate)
}
} else {
// unfound candidate is considered alive
aliveTags = append(aliveTags, candidate)
}
}
candidates = aliveTags
}
}
}

count := len(candidates)
if count == 0 {
// goes to fallbackTag
Expand Down
4 changes: 3 additions & 1 deletion infra/conf/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ func TestRouterConfig(t *testing.T) {
"balancers": [
{
"tag": "b1",
"selector": ["test"]
"selector": ["test"],
"fallbackTag": "fall"
},
{
"tag": "b2",
Expand Down Expand Up @@ -137,6 +138,7 @@ func TestRouterConfig(t *testing.T) {
Tag: "b1",
OutboundSelector: []string{"test"},
Strategy: "random",
FallbackTag: "fall",
},
{
Tag: "b2",
Expand Down

0 comments on commit 84eeb56

Please sign in to comment.