Skip to content

Commit

Permalink
NET-5338 - NET-5338 - Run a v2 mode xds server (#18579)
Browse files Browse the repository at this point in the history
* NET-5338 - NET-5338 - Run a v2 mode xds server

* fix linting
  • Loading branch information
jmurret authored Aug 24, 2023
1 parent 2225bf0 commit 051f250
Show file tree
Hide file tree
Showing 33 changed files with 921 additions and 317 deletions.
44 changes: 39 additions & 5 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
"encoding/json"
"errors"
"fmt"
proxytracker "github.com/hashicorp/consul/agent/proxy-tracker"
catalogproxycfg "github.com/hashicorp/consul/agent/proxycfg-sources/catalog"
"github.com/hashicorp/consul/lib/stringslice"
"io"
"net"
"net/http"
Expand Down Expand Up @@ -54,7 +57,6 @@ import (
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/proxycfg"
proxycfgglue "github.com/hashicorp/consul/agent/proxycfg-glue"
catalogproxycfg "github.com/hashicorp/consul/agent/proxycfg-sources/catalog"
localproxycfg "github.com/hashicorp/consul/agent/proxycfg-sources/local"
"github.com/hashicorp/consul/agent/rpcclient"
"github.com/hashicorp/consul/agent/rpcclient/configentry"
Expand Down Expand Up @@ -908,13 +910,37 @@ func (a *Agent) Failed() <-chan struct{} {
return a.apiServers.failed
}

func (a *Agent) listenAndServeGRPC() error {
if len(a.config.GRPCAddrs) < 1 && len(a.config.GRPCTLSAddrs) < 1 {
return nil
// useV2Resources returns true if "resource-apis" is present in the Experiments
// array of the agent config.
func (a *Agent) useV2Resources() bool {
if stringslice.Contains(a.baseDeps.Experiments, consul.CatalogResourceExperimentName) {
return true
}
return false
}

// getProxyWatcher returns the proper implementation of the ProxyWatcher interface.
// It will return a ProxyTracker if "resource-apis" experiment is active. Otherwise,
// it will return a ConfigSource.
func (a *Agent) getProxyWatcher() xds.ProxyWatcher {
if a.useV2Resources() {
return proxytracker.NewProxyTracker(proxytracker.ProxyTrackerConfig{
Logger: a.proxyConfig.Logger.Named("proxy-tracker"),
SessionLimiter: a.baseDeps.XDSStreamLimiter,
})
} else {
return localproxycfg.NewConfigSource(a.proxyConfig)
}
}

// configureXDSServer configures an XDS server with the proper implementation of
// the PRoxyWatcher interface and registers the XDS server with Consul's
// external facing GRPC server.
func (a *Agent) configureXDSServer() {
cfg := a.getProxyWatcher()

// TODO(agentless): rather than asserting the concrete type of delegate, we
// should add a method to the Delegate interface to build a ConfigSource.
var cfg xds.ProxyConfigSource = localproxycfg.NewConfigSource(a.proxyConfig)
if server, ok := a.delegate.(*consul.Server); ok {
catalogCfg := catalogproxycfg.NewConfigSource(catalogproxycfg.Config{
NodeName: a.config.NodeName,
Expand All @@ -941,6 +967,14 @@ func (a *Agent) listenAndServeGRPC() error {
a,
)
a.xdsServer.Register(a.externalGRPCServer)
}

func (a *Agent) listenAndServeGRPC() error {
if len(a.config.GRPCAddrs) < 1 && len(a.config.GRPCTLSAddrs) < 1 {
return nil
}

a.configureXDSServer()

// Attempt to spawn listeners
var listeners []net.Listener
Expand Down
73 changes: 73 additions & 0 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
proxytracker "github.com/hashicorp/consul/agent/proxy-tracker"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/proxycfg-sources/local"
"github.com/hashicorp/consul/agent/xds"
mathrand "math/rand"
"net"
"net/http"
Expand All @@ -22,6 +27,7 @@ import (
"os"
"path"
"path/filepath"
"reflect"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -6358,6 +6364,73 @@ func TestAgent_checkServerLastSeen(t *testing.T) {
})
}

func TestAgent_getProxyWatcher(t *testing.T) {
type testcase struct {
description string
getExperiments func() []string
expectedType xds.ProxyWatcher
}
testscases := []testcase{
{
description: "config source is returned when api-resources experiment is not configured",
expectedType: &local.ConfigSource{},
getExperiments: func() []string {
return []string{}
},
},
{
description: "proxy tracker is returned when api-resources experiment is configured",
expectedType: &proxytracker.ProxyTracker{},
getExperiments: func() []string {
return []string{consul.CatalogResourceExperimentName}
},
},
}
for _, tc := range testscases {
caConfig := tlsutil.Config{}
tlsConf, err := tlsutil.NewConfigurator(caConfig, hclog.New(nil))
require.NoError(t, err)

bd := BaseDeps{
Deps: consul.Deps{
Logger: hclog.NewInterceptLogger(nil),
Tokens: new(token.Store),
TLSConfigurator: tlsConf,
GRPCConnPool: &fakeGRPCConnPool{},
Registry: resource.NewRegistry(),
},
RuntimeConfig: &config.RuntimeConfig{
HTTPAddrs: []net.Addr{
&net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: freeport.GetOne(t)},
},
},
Cache: cache.New(cache.Options{}),
NetRPC: &LazyNetRPC{},
}

bd.XDSStreamLimiter = limiter.NewSessionLimiter()
bd.LeafCertManager = leafcert.NewManager(leafcert.Deps{
CertSigner: leafcert.NewNetRPCCertSigner(bd.NetRPC),
RootsReader: leafcert.NewCachedRootsReader(bd.Cache, "dc1"),
Config: leafcert.Config{},
})

cfg := config.RuntimeConfig{
BuildDate: time.Date(2000, 1, 1, 0, 0, 1, 0, time.UTC),
}
bd, err = initEnterpriseBaseDeps(bd, &cfg)
require.NoError(t, err)

bd.Experiments = tc.getExperiments()

agent, err := New(bd)
require.NoError(t, err)
agent.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{Logger: bd.Logger, Source: &structs.QuerySource{}})
require.NoError(t, err)
require.IsTypef(t, tc.expectedType, agent.getProxyWatcher(), fmt.Sprintf("Expected proxyWatcher to be of type %s", reflect.TypeOf(tc.expectedType)))
}

}
func getExpectedCaPoolByFile(t *testing.T) *x509.CertPool {
pool := x509.NewCertPool()
data, err := os.ReadFile("../test/ca/root.cer")
Expand Down
4 changes: 1 addition & 3 deletions agent/config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,7 @@ func DevSource() Source {
ports = {
grpc = 8502
}
experiments = [
"resource-apis"
]
experiments = []
`,
}
}
Expand Down
2 changes: 1 addition & 1 deletion agent/config/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func TestLoad_IntegrationWithFlags(t *testing.T) {
rt.DevMode = true
rt.DisableAnonymousSignature = true
rt.DisableKeyringFile = true
rt.Experiments = []string{"resource-apis"}
rt.Experiments = nil
rt.EnableDebug = true
rt.UIConfig.Enabled = true
rt.LeaveOnTerm = false
Expand Down
4 changes: 2 additions & 2 deletions agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ const (

LeaderTransferMinVersion = "1.6.0"

catalogResourceExperimentName = "resource-apis"
CatalogResourceExperimentName = "resource-apis"
)

const (
Expand Down Expand Up @@ -874,7 +874,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
}

func (s *Server) registerControllers(deps Deps) {
if stringslice.Contains(deps.Experiments, catalogResourceExperimentName) {
if stringslice.Contains(deps.Experiments, CatalogResourceExperimentName) {
catalog.RegisterControllers(s.controllerManager, catalog.DefaultControllerDependencies())
mesh.RegisterControllers(s.controllerManager, mesh.ControllerDependencies{
TrustBundleFetcher: func() (*pbproxystate.TrustBundle, error) {
Expand Down
31 changes: 0 additions & 31 deletions agent/proxy-tracker/mock_Logger.go

This file was deleted.

45 changes: 24 additions & 21 deletions agent/proxy-tracker/proxy_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package proxytracker
import (
"errors"
"fmt"
"github.com/hashicorp/go-hclog"
"sync"

"github.com/hashicorp/consul/internal/controller"
Expand All @@ -14,7 +15,6 @@ import (

"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/proxycfg"
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1"
"github.com/hashicorp/consul/proto-public/pbresource"
)

Expand All @@ -35,9 +35,9 @@ func (e *ProxyConnection) Key() string {
// when the ProxyState for that proxyID has changed.
type proxyWatchData struct {
// notifyCh is the channel that the watcher receives updates from ProxyTracker.
notifyCh chan *pbmesh.ProxyState
notifyCh chan proxycfg.ProxySnapshot
// state is the current/last updated ProxyState for a given proxy.
state *pbmesh.ProxyState
state *mesh.ProxyState
// token is the ACL token provided by the watcher.
token string
// nodeName is the node where the given proxy resides.
Expand All @@ -46,7 +46,7 @@ type proxyWatchData struct {

type ProxyTrackerConfig struct {
// logger will be used to write log messages.
Logger Logger
Logger hclog.Logger

// sessionLimiter is used to enforce xDS concurrency limits.
SessionLimiter SessionLimiter
Expand Down Expand Up @@ -87,10 +87,10 @@ func NewProxyTracker(cfg ProxyTrackerConfig) *ProxyTracker {
// Watch connects a proxy with ProxyTracker and returns the consumer a channel to receive updates,
// a channel to notify of xDS terminated session, and a cancel function to cancel the watch.
func (pt *ProxyTracker) Watch(proxyID *pbresource.ID,
nodeName string, token string) (<-chan *pbmesh.ProxyState,
nodeName string, token string) (<-chan proxycfg.ProxySnapshot,
limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) {

if err := validateArgs(proxyID, nodeName, token); err != nil {
pt.config.Logger.Trace("watch initiated", "proxyID", proxyID, "nodeName", nodeName)
if err := pt.validateWatchArgs(proxyID, nodeName); err != nil {
pt.config.Logger.Error("args failed validation", err)
return nil, nil, nil, err
}
Expand All @@ -105,7 +105,8 @@ func (pt *ProxyTracker) Watch(proxyID *pbresource.ID,

// This buffering is crucial otherwise we'd block immediately trying to
// deliver the current snapshot below if we already have one.
proxyStateChan := make(chan *pbmesh.ProxyState, 1)

proxyStateChan := make(chan proxycfg.ProxySnapshot, 1)
watchData := &proxyWatchData{
notifyCh: proxyStateChan,
state: nil,
Expand All @@ -128,9 +129,11 @@ func (pt *ProxyTracker) Watch(proxyID *pbresource.ID,
//Send an event to the controller
err = pt.notifyNewProxyChannel(proxyID)
if err != nil {
pt.config.Logger.Error("failed to notify controller of new proxy connection", err)
pt.cancelWatchLocked(proxyReferenceKey, watchData.notifyCh, session)
return nil, nil, nil, err
}
pt.config.Logger.Trace("controller notified of watch created", "proxyID", proxyID, "nodeName", nodeName)

return proxyStateChan, session.Terminated(), cancel, nil
}
Expand Down Expand Up @@ -163,34 +166,37 @@ func (pt *ProxyTracker) notifyNewProxyChannel(proxyID *pbresource.ID) error {
// - ends the session with xDS session limiter.
// - closes the proxy state channel assigned to the proxy.
// This function assumes the state lock is already held.
func (pt *ProxyTracker) cancelWatchLocked(proxyReferenceKey resource.ReferenceKey, proxyStateChan chan *pbmesh.ProxyState, session limiter.Session) {
func (pt *ProxyTracker) cancelWatchLocked(proxyReferenceKey resource.ReferenceKey, proxyStateChan chan proxycfg.ProxySnapshot, session limiter.Session) {
delete(pt.proxies, proxyReferenceKey)
session.End()
close(proxyStateChan)
pt.config.Logger.Trace("watch cancelled", "proxyReferenceKey", proxyReferenceKey)
}

func validateArgs(proxyID *pbresource.ID,
nodeName string, token string) error {
// validateWatchArgs checks the proxyIDand nodeName passed to Watch
// and returns an error if the args are not properly constructed.
func (pt *ProxyTracker) validateWatchArgs(proxyID *pbresource.ID,
nodeName string) error {
if proxyID == nil {
return errors.New("proxyID is required")
} else if proxyID.Type.Kind != mesh.ProxyStateTemplateConfigurationType.Kind {
} else if proxyID.GetType().GetKind() != mesh.ProxyStateTemplateConfigurationType.Kind {
return fmt.Errorf("proxyID must be a %s", mesh.ProxyStateTemplateConfigurationType.GetKind())
} else if nodeName == "" {
return errors.New("nodeName is required")
} else if token == "" {
return errors.New("token is required")
}

return nil
}

// PushChange allows pushing a computed ProxyState to xds for xds resource generation to send to a proxy.
func (pt *ProxyTracker) PushChange(proxyID *pbresource.ID, proxyState *pbmesh.ProxyState) error {
func (pt *ProxyTracker) PushChange(proxyID *pbresource.ID, proxyState *mesh.ProxyState) error {
pt.config.Logger.Trace("push change called for proxy", "proxyID", proxyID)
proxyReferenceKey := resource.NewReferenceKey(proxyID)
pt.mu.Lock()
defer pt.mu.Unlock()
if data, ok := pt.proxies[proxyReferenceKey]; ok {
data.state = proxyState

pt.deliverLatest(proxyID, proxyState, data.notifyCh)
} else {
return errors.New("proxyState change could not be sent because proxy is not connected")
Expand All @@ -199,7 +205,8 @@ func (pt *ProxyTracker) PushChange(proxyID *pbresource.ID, proxyState *pbmesh.Pr
return nil
}

func (pt *ProxyTracker) deliverLatest(proxyID *pbresource.ID, proxyState *pbmesh.ProxyState, ch chan *pbmesh.ProxyState) {
func (pt *ProxyTracker) deliverLatest(proxyID *pbresource.ID, proxyState *mesh.ProxyState, ch chan proxycfg.ProxySnapshot) {
pt.config.Logger.Trace("delivering latest proxy snapshot to proxy", "proxyID", proxyID)
// Send if chan is empty
select {
case ch <- proxyState:
Expand Down Expand Up @@ -254,6 +261,7 @@ func (pt *ProxyTracker) ProxyConnectedToServer(proxyID *pbresource.ID) bool {

// Shutdown removes all state and close all channels.
func (pt *ProxyTracker) Shutdown() {
pt.config.Logger.Info("proxy tracker shutdown initiated")
pt.mu.Lock()
defer pt.mu.Unlock()

Expand All @@ -271,8 +279,3 @@ func (pt *ProxyTracker) Shutdown() {
type SessionLimiter interface {
BeginSession() (limiter.Session, error)
}

//go:generate mockery --name Logger --inpackage
type Logger interface {
Error(args ...any)
}
Loading

0 comments on commit 051f250

Please sign in to comment.