Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly support adding/removing resources and getting updates from them #6

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions pkg/cache/v3/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ type Response interface {
// Get the version in the Response.
GetVersion() (string, error)

GetResourceNames() []string

// Get the context provided during response creation.
GetContext() context.Context
}
Expand Down Expand Up @@ -122,6 +124,9 @@ type RawResponse struct {
// Resources to be included in the response.
Resources []types.ResourceWithTTL

// Names of the resources included in the response
ResourceNames []string

// Whether this is a heartbeat response. For xDS versions that support TTL, this
// will be converted into a response that doesn't contain the actual resource protobuf.
// This allows for more lightweight updates that server only to update the TTL timer.
Expand Down Expand Up @@ -171,6 +176,8 @@ type PassthroughResponse struct {
// The discovery response that needs to be sent as is, without any marshaling transformations.
DiscoveryResponse *discovery.DiscoveryResponse

ResourceNames []string

ctx context.Context
}

Expand Down Expand Up @@ -227,6 +234,10 @@ func (r *RawResponse) GetDiscoveryResponse() (*discovery.DiscoveryResponse, erro
return marshaledResponse.(*discovery.DiscoveryResponse), nil
}

func (r *RawResponse) GetResourceNames() []string {
return r.ResourceNames
}

// GetDeltaDiscoveryResponse performs the marshaling the first time its called and uses the cached response subsequently.
// We can do this because the marshaled response does not change across the calls.
// This caching behavior is important in high throughput scenarios because grpc marshaling has a cost and it drives the cpu utilization under load.
Expand Down Expand Up @@ -330,6 +341,11 @@ func (r *PassthroughResponse) GetDiscoveryResponse() (*discovery.DiscoveryRespon
return r.DiscoveryResponse, nil
}

// GetResourceNames returns the list of resources included within the response
func (r *PassthroughResponse) GetResourceNames() []string {
return r.ResourceNames
}

// GetDeltaDiscoveryResponse returns the final passthrough Delta Discovery Response.
func (r *DeltaPassthroughResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscoveryResponse, error) {
return r.DeltaDiscoveryResponse, nil
Expand Down
81 changes: 61 additions & 20 deletions pkg/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)

type watches = map[chan Response]struct{}
type watches = map[ResponseWatch]struct{}

// LinearCache supports collections of opaque resources. This cache has a
// single collection indexed by resource names and manages resource versions
Expand Down Expand Up @@ -113,45 +113,57 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache {
return out
}

func (cache *LinearCache) respond(value chan Response, staleResources []string) {
func (cache *LinearCache) respond(req *Request, value chan Response, staleResources []string) {
var resources []types.ResourceWithTTL
var resourceNames []string
// TODO: optimize the resources slice creations across different clients
if len(staleResources) == 0 {
resources = make([]types.ResourceWithTTL, 0, len(cache.resources))
for _, resource := range cache.resources {
resourceNames = make([]string, 0, len(cache.resources))
for name, resource := range cache.resources {
resources = append(resources, types.ResourceWithTTL{Resource: resource})
resourceNames = append(resourceNames, name)
}
} else {
resources = make([]types.ResourceWithTTL, 0, len(staleResources))
resourceNames = make([]string, 0, len(staleResources))
for _, name := range staleResources {
resource := cache.resources[name]
if resource != nil {
resources = append(resources, types.ResourceWithTTL{Resource: resource})
resourceNames = append(resourceNames, name)
}
}
}
value <- &RawResponse{
Request: &Request{TypeUrl: cache.typeURL},
Resources: resources,
Version: cache.getVersion(),
Ctx: context.Background(),
Request: req,
Resources: resources,
ResourceNames: resourceNames,
Version: cache.getVersion(),
Ctx: context.Background(),
}
}

func (cache *LinearCache) notifyAll(modified map[string]struct{}) {
// de-duplicate watches that need to be responded
notifyList := make(map[chan Response][]string)
notifyList := make(map[ResponseWatch][]string)
for name := range modified {
for watch := range cache.watches[name] {
notifyList[watch] = append(notifyList[watch], name)
// Make sure we clean the watch for ALL resources it might be associated with,
// as the channel will no longer be listened to
for _, resource := range watch.Request.ResourceNames {
delete(cache.watches[resource], watch)
}
}
delete(cache.watches, name)
}
for value, stale := range notifyList {
cache.respond(value, stale)

for watch, stale := range notifyList {
cache.respond(watch.Request, watch.Response, stale)
}
for value := range cache.watchAll {
cache.respond(value, nil)
for watch := range cache.watchAll {
cache.respond(watch.Request, watch.Response, nil)
}
cache.watchAll = make(watches)

Expand Down Expand Up @@ -307,7 +319,7 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea
// been updated between the last version and the current version. This avoids the problem
// of sending empty updates whenever an irrelevant resource changes.
stale := false
staleResources := []string{} // empty means all
var staleResources []string // empty means all

// strip version prefix if it is present
var lastVersion uint64
Expand All @@ -324,45 +336,74 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea
if err != nil {
stale = true
staleResources = request.ResourceNames
if cache.log != nil {
cache.log.Debugf("Watch is stale as version failed to parse %s", err.Error())
}
} else if len(request.ResourceNames) == 0 {
stale = lastVersion != cache.version
if cache.log != nil {
cache.log.Debugf("Watch is stale as version differs for wildcard watch")
}
} else {
// Non wildcard case, we only reply resources that have effectively changed since the version set in the request
// This is used for instance in EDS
for _, name := range request.ResourceNames {
// When a resource is removed, its version defaults 0 and it is not considered stale.
if lastVersion < cache.versionVector[name] {
// The resource does not exist currently, we won't reply for it
if resourceVersion, ok := cache.versionVector[name]; !ok {
continue
} else if lastVersion < resourceVersion {
// When a resource is removed, its version defaults 0 and it is not considered stale.
stale = true
staleResources = append(staleResources, name)
} else if _, ok := streamState.GetResourceVersions()[name]; !ok {
// Resource is not currently known by the client (e.g. a resource is added in the resourceNames)
stale = true
staleResources = append(staleResources, name)
}
}
if cache.log != nil && stale {
cache.log.Debugf("Watch is stale with stale resources %v", staleResources)
}
}

if stale {
cache.respond(value, staleResources)
cache.respond(request, value, staleResources)
return nil
}

// Create open watches since versions are up to date.
watch := ResponseWatch{request, value}
if len(request.ResourceNames) == 0 {
cache.watchAll[value] = struct{}{}
if cache.log != nil {
cache.log.Infof("[linear cache] open watch for %s all resources, system version %q",
cache.typeURL, cache.getVersion())
}
cache.watchAll[watch] = struct{}{}
return func() {
cache.mu.Lock()
defer cache.mu.Unlock()
delete(cache.watchAll, value)
delete(cache.watchAll, watch)
}
}
if cache.log != nil {
cache.log.Infof("[linear cache] open watch for %s resources %v, system version %q",
cache.typeURL, request.ResourceNames, cache.getVersion())
}
for _, name := range request.ResourceNames {
set, exists := cache.watches[name]
if !exists {
set = make(watches)
cache.watches[name] = set
}
set[value] = struct{}{}
set[watch] = struct{}{}
}
return func() {
cache.mu.Lock()
defer cache.mu.Unlock()
for _, name := range request.ResourceNames {
set, exists := cache.watches[name]
if exists {
delete(set, value)
delete(set, watch)
}
if len(set) == 0 {
delete(cache.watches, name)
Expand Down
Loading