Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release v0.31 Cherry picked back missing v0.31.1 PR - KNP agent should pick the larger of the two server count #720

Merged
merged 4 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions cmd/agent/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ type GrpcProxyAgentOptions struct {
// Enables updating the server count by counting the number of valid leases
// matching the selector.
CountServerLeases bool
// Namespace where lease objects are managed.
LeaseNamespace string
// Labels on which lease objects are managed.
LeaseLabel string
// ServerCountSource describes how server counts should be combined.
ServerCountSource string
// Path to kubeconfig (used by kubernetes client for lease listing)
KubeconfigPath string
// Content type of requests sent to apiserver.
Expand All @@ -104,6 +110,7 @@ func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOptions ...grpc.DialOption)
WarnOnChannelLimit: o.WarnOnChannelLimit,
SyncForever: o.SyncForever,
XfrChannelSize: o.XfrChannelSize,
ServerCountSource: o.ServerCountSource,
}
}

Expand Down Expand Up @@ -132,6 +139,9 @@ func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet {
flags.BoolVar(&o.SyncForever, "sync-forever", o.SyncForever, "If true, the agent continues syncing, in order to support server count changes.")
flags.IntVar(&o.XfrChannelSize, "xfr-channel-size", 150, "Set the size of the channel for transferring data between the agent and the proxy server.")
flags.BoolVar(&o.CountServerLeases, "count-server-leases", o.CountServerLeases, "Enables lease counting system to determine the number of proxy servers to connect to.")
flags.StringVar(&o.LeaseNamespace, "lease-namespace", o.LeaseNamespace, "Namespace where lease objects are managed.")
flags.StringVar(&o.LeaseLabel, "lease-label", o.LeaseLabel, "The labels on which the lease objects are managed.")
flags.StringVar(&o.ServerCountSource, "server-count-source", o.ServerCountSource, "Defines how the server counts from lease and from server responses are combined. Possible values: 'default' to use only one source (server or leases depending on other flags), 'max' to take the larger value.")
flags.StringVar(&o.KubeconfigPath, "kubeconfig", o.KubeconfigPath, "Path to the kubeconfig file")
flags.StringVar(&o.APIContentType, "kube-api-content-type", o.APIContentType, "Content type of requests sent to apiserver.")
return flags
Expand Down Expand Up @@ -159,6 +169,10 @@ func (o *GrpcProxyAgentOptions) Print() {
klog.V(1).Infof("AgentIdentifiers set to %s.\n", util.PrettyPrintURL(o.AgentIdentifiers))
klog.V(1).Infof("WarnOnChannelLimit set to %t.\n", o.WarnOnChannelLimit)
klog.V(1).Infof("SyncForever set to %v.\n", o.SyncForever)
klog.V(1).Infof("CountServerLeases set to %v.\n", o.CountServerLeases)
klog.V(1).Infof("LeaseNamespace set to %s.\n", o.LeaseNamespace)
klog.V(1).Infof("LeaseLabel set to %s.\n", o.LeaseLabel)
klog.V(1).Infof("ServerCountSource set to %s.\n", o.ServerCountSource)
klog.V(1).Infof("ChannelSize set to %d.\n", o.XfrChannelSize)
klog.V(1).Infof("APIContentType set to %v.\n", o.APIContentType)
}
Expand Down Expand Up @@ -216,6 +230,18 @@ func (o *GrpcProxyAgentOptions) Validate() error {
return fmt.Errorf("error checking KubeconfigPath %q, got %v", o.KubeconfigPath, err)
}
}
// Validate labels provided.
if o.CountServerLeases {
_, err := util.ParseLabels(o.LeaseLabel)
if err != nil {
return err
}
}
if o.ServerCountSource != "" {
if o.ServerCountSource != "default" && o.ServerCountSource != "max" {
return fmt.Errorf("--server-count-source must be one of '', 'default', 'max', got %v", o.ServerCountSource)
}
}

return nil
}
Expand Down Expand Up @@ -263,6 +289,9 @@ func NewGrpcProxyAgentOptions() *GrpcProxyAgentOptions {
SyncForever: false,
XfrChannelSize: 150,
CountServerLeases: false,
LeaseNamespace: "kube-system",
LeaseLabel: "k8s-app=konnectivity-server",
ServerCountSource: "default",
KubeconfigPath: "",
APIContentType: runtime.ContentTypeProtobuf,
}
Expand Down
4 changes: 4 additions & 0 deletions cmd/agent/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ func TestValidate(t *testing.T) {
fieldMap: map[string]interface{}{"XfrChannelSize": -10},
expected: fmt.Errorf("channel size -10 must be greater than 0"),
},
"ServerCountSource": {
fieldMap: map[string]interface{}{"ServerCountSource": "foobar"},
expected: fmt.Errorf("--server-count-source must be one of '', 'default', 'max', got foobar"),
},
} {
t.Run(desc, func(t *testing.T) {
testAgentOptions := NewGrpcProxyAgentOptions()
Expand Down
5 changes: 2 additions & 3 deletions cmd/agent/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ import (

const (
ReadHeaderTimeout = 60 * time.Second
LeaseNamespace = "kube-system"
LeaseInformerResync = time.Second * 10
)

Expand Down Expand Up @@ -163,11 +162,11 @@ func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, drainCh, st
if err != nil {
return nil, fmt.Errorf("failed to create kubernetes clientset: %v", err)
}
leaseInformer := agent.NewLeaseInformerWithMetrics(k8sClient, LeaseNamespace, LeaseInformerResync)
leaseInformer := agent.NewLeaseInformerWithMetrics(k8sClient, o.LeaseNamespace, LeaseInformerResync)
go leaseInformer.Run(stopCh)
cache.WaitForCacheSync(stopCh, leaseInformer.HasSynced)
leaseLister := coordinationv1lister.NewLeaseLister(leaseInformer.GetIndexer())
serverLeaseSelector, _ := labels.Parse("k8s-app=konnectivity-server")
serverLeaseSelector, _ := labels.Parse(o.LeaseLabel)
serverLeaseCounter := agent.NewServerLeaseCounter(
clock.RealClock{},
leaseLister,
Expand Down
18 changes: 18 additions & 0 deletions cmd/server/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ type ProxyRunOptions struct {

// Lease controller configuration
EnableLeaseController bool
// Lease Namespace
LeaseNamespace string
// Lease Labels
LeaseLabel string
}

func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
Expand Down Expand Up @@ -146,6 +150,8 @@ func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
flags.StringSliceVar(&o.CipherSuites, "cipher-suites", o.CipherSuites, "The comma separated list of allowed cipher suites. Has no effect on TLS1.3. Empty means allow default list.")
flags.IntVar(&o.XfrChannelSize, "xfr-channel-size", o.XfrChannelSize, "The size of the two KNP server channels used in server for transferring data. One channel is for data coming from the Kubernetes API Server, and the other one is for data coming from the KNP agent.")
flags.BoolVar(&o.EnableLeaseController, "enable-lease-controller", o.EnableLeaseController, "Enable lease controller to publish and garbage collect proxy server leases.")
flags.StringVar(&o.LeaseNamespace, "lease-namespace", o.LeaseNamespace, "The namespace where lease objects are managed by the controller.")
flags.StringVar(&o.LeaseLabel, "lease-label", o.LeaseLabel, "The labels on which the lease objects are managed.")
flags.Bool("warn-on-channel-limit", true, "This behavior is now thread safe and always on. This flag will be removed in a future release.")
flags.MarkDeprecated("warn-on-channel-limit", "This behavior is now thread safe and always on. This flag will be removed in a future release.")

Expand Down Expand Up @@ -184,6 +190,9 @@ func (o *ProxyRunOptions) Print() {
klog.V(1).Infof("KubeconfigBurst set to %d.\n", o.KubeconfigBurst)
klog.V(1).Infof("APIContentType set to %v.\n", o.APIContentType)
klog.V(1).Infof("ProxyStrategies set to %q.\n", o.ProxyStrategies)
klog.V(1).Infof("EnableLeaseController set to %v.\n", o.EnableLeaseController)
klog.V(1).Infof("LeaseNamespace set to %s.\n", o.LeaseNamespace)
klog.V(1).Infof("LeaseLabel set to %s.\n", o.LeaseLabel)
klog.V(1).Infof("CipherSuites set to %q.\n", o.CipherSuites)
klog.V(1).Infof("XfrChannelSize set to %d.\n", o.XfrChannelSize)
}
Expand Down Expand Up @@ -321,6 +330,13 @@ func (o *ProxyRunOptions) Validate() error {
}
}
}
// Validate labels provided.
if o.EnableLeaseController {
_, err := util.ParseLabels(o.LeaseLabel)
if err != nil {
return err
}
}

return nil
}
Expand Down Expand Up @@ -361,6 +377,8 @@ func NewProxyRunOptions() *ProxyRunOptions {
CipherSuites: make([]string, 0),
XfrChannelSize: 10,
EnableLeaseController: false,
LeaseNamespace: "kube-system",
LeaseLabel: "k8s-app=konnectivity-server",
}
return &o
}
Expand Down
10 changes: 7 additions & 3 deletions cmd/server/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ const (
LeaseDuration = 30 * time.Second
LeaseRenewalInterval = 15 * time.Second
LeaseGCInterval = 15 * time.Second
LeaseNamespace = "kube-system"
)

func NewProxyCommand(p *Proxy, o *options.ProxyRunOptions) *cobra.Command {
Expand Down Expand Up @@ -156,6 +155,11 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error {
}
defer p.agentServer.Stop()

labels, err := util.ParseLabels(o.LeaseLabel)
if err != nil {
return err
}

if o.EnableLeaseController {
leaseController := leases.NewController(
k8sClient,
Expand All @@ -164,8 +168,8 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error {
LeaseRenewalInterval,
LeaseGCInterval,
fmt.Sprintf("konnectivity-proxy-server-%v", o.ServerID),
LeaseNamespace,
map[string]string{"k8s-app": "konnectivity-server"},
o.LeaseNamespace,
labels,
)
klog.V(1).Infoln("Starting lease acquisition and garbage collection controller.")
leaseController.Run(ctx)
Expand Down
49 changes: 42 additions & 7 deletions pkg/agent/clientset.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ import (
"sigs.k8s.io/apiserver-network-proxy/pkg/agent/metrics"
)

const (
fromResponses = "KNP server response headers"
fromLeases = "KNP lease count"
fromFallback = "fallback to 1"
)

// ClientSet consists of clients connected to each instance of an HA proxy server.
type ClientSet struct {
mu sync.Mutex //protects the clients.
Expand All @@ -39,7 +45,7 @@ type ClientSet struct {
agentID string // ID of this agent
address string // proxy server address. Assuming HA proxy server

leaseCounter *ServerLeaseCounter // counts number of proxy server leases
leaseCounter ServerCounter // counts number of proxy server leases
lastReceivedServerCount int // last server count received from a proxy server
lastServerCount int // last server count value from either lease system or proxy server, former takes priority

Expand Down Expand Up @@ -68,6 +74,7 @@ type ClientSet struct {
xfrChannelSize int

syncForever bool // Continue syncing (support dynamic server count).
serverCountSource string
}

func (cs *ClientSet) ClientsCount() int {
Expand Down Expand Up @@ -147,7 +154,8 @@ type ClientSetConfig struct {
WarnOnChannelLimit bool
SyncForever bool
XfrChannelSize int
ServerLeaseCounter *ServerLeaseCounter
ServerLeaseCounter ServerCounter
ServerCountSource string
}

func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}) *ClientSet {
Expand All @@ -167,6 +175,7 @@ func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}) *C
xfrChannelSize: cc.XfrChannelSize,
stopCh: stopCh,
leaseCounter: cc.ServerLeaseCounter,
serverCountSource: cc.ServerCountSource,
}
}

Expand Down Expand Up @@ -218,15 +227,41 @@ func (cs *ClientSet) sync() {
}

func (cs *ClientSet) ServerCount() int {

var serverCount int
if cs.leaseCounter != nil {
serverCount = cs.leaseCounter.Count()
} else {
serverCount = cs.lastReceivedServerCount
var countSourceLabel string

switch cs.serverCountSource {
case "", "default":
if cs.leaseCounter != nil {
serverCount = cs.leaseCounter.Count()
countSourceLabel = fromLeases
} else {
serverCount = cs.lastReceivedServerCount
countSourceLabel = fromResponses
}
case "max":
countFromLeases := 0
if cs.leaseCounter != nil {
countFromLeases = cs.leaseCounter.Count()
}
countFromResponses := cs.lastReceivedServerCount

serverCount = countFromLeases
countSourceLabel = fromLeases
if countFromResponses > serverCount {
serverCount = countFromResponses
countSourceLabel = fromResponses
}
if serverCount == 0 {
serverCount = 1
countSourceLabel = fromFallback
}

}

if serverCount != cs.lastServerCount {
klog.Warningf("change detected in proxy server count (was: %d, now: %d)", cs.lastServerCount, serverCount)
klog.Warningf("change detected in proxy server count (was: %d, now: %d, source: %q)", cs.lastServerCount, serverCount, countSourceLabel)
cs.lastServerCount = serverCount
}

Expand Down
93 changes: 93 additions & 0 deletions pkg/agent/clientset_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package agent

import (
"testing"
)

type FakeServerCounter struct {
count int
}

func (f *FakeServerCounter) Count() int {
return f.count
}

func TestServerCount(t *testing.T) {
testCases := []struct{
name string
serverCountSource string
leaseCounter ServerCounter
responseCount int
want int
} {
{
name: "higher from response",
serverCountSource: "max",
responseCount: 42,
leaseCounter: &FakeServerCounter{24},
want: 42,
},
{
name: "higher from leases",
serverCountSource: "max",
responseCount: 3,
leaseCounter: &FakeServerCounter{6},
want: 6,
},
{
name: "both zero",
serverCountSource: "max",
responseCount: 0,
leaseCounter: &FakeServerCounter{0},
want: 1,
},

{
name: "response picked by default when no lease counter",
serverCountSource: "default",
responseCount: 3,
leaseCounter: nil,
want: 3,
},
{
name: "lease counter always picked when present",
serverCountSource: "default",
responseCount: 6,
leaseCounter: &FakeServerCounter{3},
want: 3,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {

cs := &ClientSet{
clients: make(map[string]*Client),
leaseCounter: tc.leaseCounter,
serverCountSource: tc.serverCountSource,

}
cs.lastReceivedServerCount = tc.responseCount
if got := cs.ServerCount(); got != tc.want {
t.Errorf("cs.ServerCount() = %v, want: %v", got, tc.want)
}
})
}

}
4 changes: 4 additions & 0 deletions pkg/agent/lease_counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ import (
coordinationv1lister "k8s.io/client-go/listers/coordination/v1"
)

type ServerCounter interface {
Count() int
}

// A ServerLeaseCounter counts leases in the k8s apiserver to determine the
// current proxy server count.
type ServerLeaseCounter struct {
Expand Down
Loading
Loading