Skip to content

Commit

Permalink
fix:service_contract and xdsv3 bug
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun committed Dec 31, 2023
1 parent 56675aa commit 9e91212
Show file tree
Hide file tree
Showing 36 changed files with 708 additions and 1,231 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/benchmark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v4
with:
go-version:
go-version: "1.21.5"
# Checkout latest code
- name: Checkout repo
uses: actions/checkout@v2
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/codecov.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v4
with:
go-version: "1.20"
go-version: "1.21.5"
# Checkout latest code
- name: Checkout repo
uses: actions/checkout@v2
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v4
with:
go-version: "1.20"
go-version: "1.21.5"

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
golangci:
strategy:
matrix:
go-version: [ "1.20" ]
go-version: [ "1.21.5" ]
name: golangci-lint
runs-on: ubuntu-latest
steps:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/integration-testing-mysql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v4
with:
go-version: "1.20"
go-version: "1.21.5"
# Checkout latest code
- name: Checkout repo
uses: actions/checkout@v2
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/integration-testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v4
with:
go-version: "1.20"
go-version: "1.21.5"
# Checkout latest code
- name: Checkout repo
uses: actions/checkout@v2
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v4
with:
go-version: "1.20"
go-version: "1.21.5"

- name: Get version
id: get_version
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/standalone.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v4
with:
go-version: "1.20"
go-version: "1.21.5"

- name: Build
id: build
Expand Down
130 changes: 31 additions & 99 deletions apiserver/xdsserverv3/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,9 @@ package cache
import (
"context"
"errors"
"fmt"

"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
"go.uber.org/zap"

Expand All @@ -39,7 +36,7 @@ type (
// hash is the hashing function for Envoy nodes
hash cachev3.NodeHash
// Muxed caches.
Caches *utils.SyncMap[string, cache.Cache]
Caches *utils.SyncMap[string, cachev3.Cache]
}

// CacheHook
Expand Down Expand Up @@ -70,7 +67,7 @@ func (sc *XDSCache) CreateWatch(request *cachev3.Request, streamState stream.Str
if sc.hook != nil {
sc.hook.OnCreateWatch(request, streamState, value)
}
item := sc.loadCache(request)
item := sc.loadCache(request, streamState)
if item == nil {
value <- nil
return func() {}
Expand All @@ -84,7 +81,7 @@ func (sc *XDSCache) CreateDeltaWatch(request *cachev3.DeltaRequest, state stream
if sc.hook != nil {
sc.hook.OnCreateDeltaWatch(request, state, value)
}
item := sc.loadCache(request)
item := sc.loadCache(request, state)
if item == nil {
value <- &NoReadyXdsResponse{}
return func() {}
Expand All @@ -98,30 +95,21 @@ func (sc *XDSCache) Fetch(ctx context.Context, request *cachev3.Request) (cachev
return nil, errors.New("not implemented")
}

// DeltaUpdateNodeResource .
func (sc *XDSCache) DeltaUpdateNodeResource(client *resource.XDSClient, key, typeUrl string, current map[string]types.Resource) error {
val, _ := sc.Caches.ComputeIfAbsent(key, func(_ string) cachev3.Cache {
return NewLinearCache(typeUrl)
})
linearCache, _ := val.(*LinearCache)
return linearCache.UpdateNodeResource(client, current)
}

// DeltaUpdateResource .
func (sc *XDSCache) DeltaUpdateResource(key, typeUrl string, current map[string]types.Resource) error {
val, _ := sc.Caches.ComputeIfAbsent(key, func(_ string) cachev3.Cache {
return NewLinearCache(typeUrl)
return cachev3.NewLinearCache(typeUrl, cachev3.WithLogger(log))
})
linearCache, _ := val.(*LinearCache)
linearCache, _ := val.(*cachev3.LinearCache)
return linearCache.UpdateResources(current, []string{})
}

// DeltaRemoveResource .
func (sc *XDSCache) DeltaRemoveResource(key, typeUrl string, current map[string]types.Resource) error {
val, _ := sc.Caches.ComputeIfAbsent(key, func(_ string) cachev3.Cache {
return NewLinearCache(typeUrl)
return cachev3.NewLinearCache(typeUrl, cachev3.WithLogger(log))
})
linearCache, _ := val.(*LinearCache)
linearCache, _ := val.(*cachev3.LinearCache)

waitRemove := make([]string, 0, len(current))
for k := range current {
Expand All @@ -130,99 +118,43 @@ func (sc *XDSCache) DeltaRemoveResource(key, typeUrl string, current map[string]
return linearCache.UpdateResources(nil, waitRemove)
}

func classify(typeUrl string, resources []string, client *resource.XDSClient) []string {
isAllowNode := false
_, isAllowTls := allowTlsResource[typeUrl]
if isAllowNodeFunc, exist := allowEachNodeResource[typeUrl]; exist {
isAllowNode = isAllowNodeFunc(typeUrl, resources, client)
}
first := typeUrl + "~" + client.Node.GetId()
second := typeUrl + "~" + client.GetSelfNamespace()
tlsMode, exist := client.Metadata[resource.TLSModeTag]

// 没有设置 TLS 开关
if !exist || tlsMode == string(resource.TLSModeNone) {
if isAllowNode {
return []string{first, second}
}
return []string{second}
}
if isAllowNode {
if isAllowTls {
return []string{first, second, second + "~" + tlsMode}
}
return []string{first, second}
}
if isAllowTls {
return []string{first, second, second + "~" + tlsMode}
}
return []string{second}
}

type PredicateNodeResource func(typeUrl string, resources []string, client *resource.XDSClient) bool

var (
allowEachNodeResource = map[string]PredicateNodeResource{
resourcev3.ListenerType: func(typeUrl string, resources []string, client *resource.XDSClient) bool {
return true
},
resourcev3.EndpointType: func(typeUrl string, resources []string, client *resource.XDSClient) bool {
selfSvc := fmt.Sprintf("INBOUND|%s|%s", client.GetSelfNamespace(), client.GetSelfService())
for i := range resources {
if resources[i] == selfSvc {
return true
}
}
return false
},
resourcev3.RouteType: func(typeUrl string, resources []string, client *resource.XDSClient) bool {
selfSvc := resource.MakeInBoundRouteConfigName(client.GetSelfServiceKey())
for i := range resources {
if resources[i] == selfSvc {
return true
}
}
return false
},
}
allowTlsResource = map[string]struct{}{
resourcev3.ListenerType: {},
resourcev3.ClusterType: {},
}
)

func (sc *XDSCache) loadCache(req interface{}) cachev3.Cache {
func (sc *XDSCache) loadCache(req interface{}, streamState stream.StreamState) cachev3.Cache {
var (
keys []string
typeUrl string
client *resource.XDSClient
subscribeResources []string
typeUrl string
client *resource.XDSClient
)
switch args := req.(type) {
case *cache.Request:
case *cachev3.Request:
client = resource.ParseXDSClient(args.GetNode())
subscribeResources = args.GetResourceNames()
keys = classify(args.TypeUrl, subscribeResources, client)
typeUrl = args.TypeUrl
case *cache.DeltaRequest:
case *cachev3.DeltaRequest:
client = resource.ParseXDSClient(args.GetNode())
subscribeResources = args.GetResourceNamesSubscribe()
keys = classify(args.TypeUrl, subscribeResources, client)
typeUrl = args.TypeUrl
default:
log.Error("[XDS][V3] no support client request type", zap.Any("req", args))
return nil
}
for i := range keys {
val, ok := sc.Caches.Load(keys[i])
if ok {
log.Info("[XDS][V3] load cache to handle client request", zap.Strings("keys", keys),
zap.String("hit-key", keys[i]), zap.Strings("subscribe", subscribeResources),
zap.String("type", typeUrl), zap.String("client", client.Node.GetId()))
return val
}
cacheKey := BuildCacheKey(typeUrl, client.TLSMode, client)
val, ok := sc.Caches.Load(cacheKey)
if ok {
log.Info("[XDS][V3] load cache to handle client request",
zap.String("cache-key", cacheKey), zap.String("type", typeUrl),
zap.String("client", client.Node.GetId()), zap.Bool("wildcard", streamState.IsWildcard()))
return val
}
log.Error("[XDS][V3] cache not found to handle client request", zap.String("type", typeUrl),
zap.String("client", client.Node.GetId()))
return nil
}

func BuildCacheKey(typeUrl string, tlsMode resource.TLSMode, client *resource.XDSClient) string {
xdsType := resource.FormatTypeUrl(typeUrl)
if xdsType == resource.LDS {
return typeUrl + "~" + client.GetNodeID()
}
key := typeUrl + "~" + client.GetSelfNamespace()
if resource.SupportTLS(xdsType) && resource.EnableTLS(tlsMode) {
key = key + "~" + string(tlsMode)
}
return key
}
5 changes: 1 addition & 4 deletions apiserver/xdsserverv3/cache/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ type Callbacks struct {
nodeMgr *resource.XDSNodeManager
}

func (cb *Callbacks) Report() {

}

func (cb *Callbacks) OnStreamOpen(_ context.Context, id int64, typ string) error {
return nil
}
Expand All @@ -57,6 +53,7 @@ func (cb *Callbacks) OnDeltaStreamOpen(_ context.Context, id int64, typ string)
}

func (cb *Callbacks) OnDeltaStreamClosed(id int64, node *corev3.Node) {
cb.nodeMgr.DelNode(id)
}

func (cb *Callbacks) OnStreamRequest(id int64, req *discovery.DiscoveryRequest) error {
Expand Down
Loading

0 comments on commit 9e91212

Please sign in to comment.