-
Notifications
You must be signed in to change notification settings - Fork 242
/
Copy pathmonitor.go
177 lines (162 loc) · 7.14 KB
/
monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package v2
import (
"context"
"math"
"sync"
"github.com/Azure/azure-container-networking/cns"
"github.com/Azure/azure-container-networking/crd/clustersubnetstate/api/v1alpha1"
"github.com/Azure/azure-container-networking/crd/nodenetworkconfig/api/v1alpha"
"github.com/pkg/errors"
"go.uber.org/zap"
)
const (
// DefaultMaxIPs default maximum allocatable IPs on a k8s Node.
DefaultMaxIPs = 250
// fieldManager is the field manager used when patching the NodeNetworkConfig.
fieldManager = "azure-cns"
)
type nodeNetworkConfigSpecUpdater interface {
PatchSpec(context.Context, *v1alpha.NodeNetworkConfigSpec, string) (*v1alpha.NodeNetworkConfig, error)
}
type ipStateStore interface {
GetPendingReleaseIPConfigs() []cns.IPConfigurationStatus
MarkNIPsPendingRelease(n int) (map[string]cns.IPConfigurationStatus, error)
}
type scaler struct {
batch int64
buffer float64
exhausted bool
max int64
}
type Monitor struct {
z *zap.Logger
scaler scaler
nnccli nodeNetworkConfigSpecUpdater
store ipStateStore
demand int64
request int64
demandSource <-chan int
cssSource <-chan v1alpha1.ClusterSubnetState
nncSource <-chan v1alpha.NodeNetworkConfig
started chan interface{}
once sync.Once
legacyMetricsObserver func(context.Context) error
}
func NewMonitor(z *zap.Logger, store ipStateStore, nnccli nodeNetworkConfigSpecUpdater, demandSource <-chan int, nncSource <-chan v1alpha.NodeNetworkConfig, cssSource <-chan v1alpha1.ClusterSubnetState) *Monitor { //nolint:lll // it's fine
return &Monitor{
z: z.With(zap.String("component", "ipam-pool-monitor")),
store: store,
nnccli: nnccli,
demandSource: demandSource,
cssSource: cssSource,
nncSource: nncSource,
started: make(chan interface{}),
legacyMetricsObserver: func(context.Context) error { return nil },
}
}
// Start begins the Monitor's pool reconcile loop.
// On first run, it will block until a NodeNetworkConfig is received (through a call to Update()).
// Subsequently, it will run run once per RefreshDelay and attempt to re-reconcile the pool.
func (pm *Monitor) Start(ctx context.Context) error {
pm.z.Debug("starting")
for {
// proceed when things happen:
select {
case <-ctx.Done(): // calling context has closed, we'll exit.
return errors.Wrap(ctx.Err(), "pool monitor context closed")
case demand := <-pm.demandSource: // updated demand for IPs, recalculate request
pm.demand = int64(demand)
pm.z.Info("demand update", zap.Int64("demand", pm.demand))
case css := <-pm.cssSource: // received an updated ClusterSubnetState, recalculate request
pm.scaler.exhausted = css.Status.Exhausted
pm.z.Info("exhaustion update", zap.Bool("exhausted", pm.scaler.exhausted))
case nnc := <-pm.nncSource: // received a new NodeNetworkConfig, extract the data from it and recalculate request
pm.scaler.max = int64(math.Min(float64(nnc.Status.Scaler.MaxIPCount), DefaultMaxIPs))
pm.scaler.batch = int64(math.Min(math.Max(float64(nnc.Status.Scaler.BatchSize), 1), float64(pm.scaler.max)))
pm.scaler.buffer = math.Abs(float64(nnc.Status.Scaler.RequestThresholdPercent)) / 100 //nolint:gomnd // it's a percentage
pm.once.Do(func() {
pm.request = nnc.Spec.RequestedIPCount
close(pm.started) // close the init channel the first time we fully receive a NodeNetworkConfig.
pm.z.Debug("started", zap.Int64("initial request", pm.request))
})
pm.z.Info("scaler update", zap.Int64("batch", pm.scaler.batch), zap.Float64("buffer", pm.scaler.buffer), zap.Int64("max", pm.scaler.max), zap.Int64("request", pm.request))
}
select {
case <-pm.started: // this blocks until we have initialized
default:
// if we haven't started yet, we need to wait for the first NNC to be received.
continue // jumps to the next iteration of the outer for-loop
}
// if control has flowed through the select(s) to this point, we can now reconcile.
if err := pm.reconcile(ctx); err != nil {
pm.z.Error("reconcile failed", zap.Error(err))
}
if err := pm.legacyMetricsObserver(ctx); err != nil {
pm.z.Error("legacy metrics observer failed", zap.Error(err))
}
}
}
func (pm *Monitor) reconcile(ctx context.Context) error {
// if the subnet is exhausted, locally overwrite the batch/minfree/maxfree in the meta copy for this iteration
// (until the controlplane owns this and modifies the scaler values for us directly instead of writing "exhausted")
// TODO(rbtr)
s := pm.scaler
if s.exhausted {
s.batch = 1
s.buffer = 1
}
// calculate the target state from the current pool state and scaler
target := calculateTargetIPCountOrMax(pm.demand, s.batch, s.max, s.buffer)
pm.z.Info("calculated new request", zap.Int64("demand", pm.demand), zap.Int64("batch", s.batch), zap.Int64("max", s.max), zap.Float64("buffer", s.buffer), zap.Int64("target", target))
delta := target - pm.request
if delta == 0 {
pm.z.Info("NNC already at target IPs, no scaling required")
return nil
}
pm.z.Info("scaling pool", zap.Int64("delta", delta))
// try to release -delta IPs. this is no-op if delta is negative.
if _, err := pm.store.MarkNIPsPendingRelease(int(-delta)); err != nil {
return errors.Wrapf(err, "failed to mark sufficient IPs as PendingRelease, wanted %d", pm.request-target)
}
spec := pm.buildNNCSpec(target)
if _, err := pm.nnccli.PatchSpec(ctx, &spec, fieldManager); err != nil {
return errors.Wrap(err, "failed to UpdateSpec with NNC client")
}
pm.request = target
pm.z.Info("scaled pool", zap.Int64("request", pm.request))
return nil
}
// buildNNCSpec translates CNS's map of IPs to be released and requested IP count into an NNC Spec.
func (pm *Monitor) buildNNCSpec(request int64) v1alpha.NodeNetworkConfigSpec {
// Get All Pending IPs from CNS and populate it again.
pendingReleaseIPs := pm.store.GetPendingReleaseIPConfigs()
spec := v1alpha.NodeNetworkConfigSpec{
RequestedIPCount: request,
IPsNotInUse: make([]string, len(pendingReleaseIPs)),
}
for i := range pendingReleaseIPs {
spec.IPsNotInUse[i] = pendingReleaseIPs[i].ID
}
return spec
}
func (pm *Monitor) WithLegacyMetricsObserver(observer func(context.Context) error) {
pm.legacyMetricsObserver = observer
}
// calculateTargetIPCountOrMax calculates the target IP count request
// using the scaling function and clamps the result at the max IPs.
func calculateTargetIPCountOrMax(demand, batch, max int64, buffer float64) int64 {
targetRequest := calculateTargetIPCount(demand, batch, buffer)
if targetRequest > max {
// clamp request at the max IPs
targetRequest = max
}
return targetRequest
}
// calculateTargetIPCount calculates an IP count request based on the
// current demand, batch size, and buffer.
// ref: https://github.com/Azure/azure-container-networking/blob/master/docs/feature/ipammath/0-background.md
// the idempotent scaling function is:
// Target = Batch \times \lceil buffer + \frac{Demand}{Batch} \rceil
func calculateTargetIPCount(demand, batch int64, buffer float64) int64 {
return batch * int64(math.Ceil(buffer+float64(demand)/float64(batch)))
}