|
8 | 8 | "context"
|
9 | 9 | "encoding/json"
|
10 | 10 | "fmt"
|
| 11 | + "net" |
11 | 12 | "net/http"
|
12 | 13 | "net/http/httptest"
|
13 | 14 | "reflect"
|
@@ -274,57 +275,165 @@ func (service *HTTPRestService) syncHostNCVersion(ctx context.Context, channelMo
|
274 | 275 | return len(programmedNCs), nil
|
275 | 276 | }
|
276 | 277 |
|
277 |
| -// This API will be called by CNS RequestController on CRD update. |
278 |
| -func (service *HTTPRestService) ReconcileNCState(ncRequest *cns.CreateNetworkContainerRequest, podInfoByIP map[string]cns.PodInfo, nnc *v1alpha.NodeNetworkConfig) types.ResponseCode { |
279 |
| - logger.Printf("Reconciling NC state with CreateNCRequest: [%v], PodInfo [%+v], NNC: [%+v]", ncRequest, podInfoByIP, nnc) |
280 |
| - // check if ncRequest is null, then return as there is no CRD state yet |
281 |
| - if ncRequest == nil { |
| 278 | +func (service *HTTPRestService) ReconcileIPAMState(ncReqs []*cns.CreateNetworkContainerRequest, podInfoByIP map[string]cns.PodInfo, nnc *v1alpha.NodeNetworkConfig) types.ResponseCode { |
| 279 | + logger.Printf("Reconciling CNS IPAM state with nc requests: [%+v], PodInfo [%+v], NNC: [%+v]", ncReqs, podInfoByIP, nnc) |
| 280 | + // if no nc reqs, there is no CRD state yet |
| 281 | + if len(ncReqs) == 0 { |
282 | 282 | logger.Printf("CNS starting with no NC state, podInfoMap count %d", len(podInfoByIP))
|
283 | 283 | return types.Success
|
284 | 284 | }
|
285 | 285 |
|
286 |
| - // If the NC was created successfully, then reconcile the assigned pod state |
287 |
| - returnCode := service.CreateOrUpdateNetworkContainerInternal(ncRequest) |
288 |
| - if returnCode != types.Success { |
289 |
| - return returnCode |
| 286 | + // first step in reconciliation is to create all the NCs in CNS, no IP assignment yet. |
| 287 | + for _, ncReq := range ncReqs { |
| 288 | + returnCode := service.CreateOrUpdateNetworkContainerInternal(ncReq) |
| 289 | + if returnCode != types.Success { |
| 290 | + return returnCode |
| 291 | + } |
290 | 292 | }
|
291 | 293 |
|
292 |
| - // now parse the secondaryIP list, if it exists in PodInfo list, then assign that ip. |
293 |
| - for _, secIPConfig := range ncRequest.SecondaryIPConfigs { |
294 |
| - if podInfo, exists := podInfoByIP[secIPConfig.IPAddress]; exists { |
295 |
| - logger.Printf("SecondaryIP %+v is assigned to Pod. %+v, ncId: %s", secIPConfig, podInfo, ncRequest.NetworkContainerid) |
| 294 | + // index all the secondary IP configs for all the nc reqs, for easier lookup later on. |
| 295 | + allSecIPsIdx := make(map[string]*cns.CreateNetworkContainerRequest) |
| 296 | + for i := range ncReqs { |
| 297 | + for _, secIPConfig := range ncReqs[i].SecondaryIPConfigs { |
| 298 | + allSecIPsIdx[secIPConfig.IPAddress] = ncReqs[i] |
| 299 | + } |
| 300 | + } |
296 | 301 |
|
297 |
| - jsonContext, err := podInfo.OrchestratorContext() |
298 |
| - if err != nil { |
299 |
| - logger.Errorf("Failed to marshal KubernetesPodInfo, error: %v", err) |
300 |
| - return types.UnexpectedError |
301 |
| - } |
| 302 | + // we now need to reconcile IP assignment. |
| 303 | + // considering that a single pod may have multiple ips (such as in dual stack scenarios) |
| 304 | + // and that IP assignment in CNS (as done by requestIPConfigsHelper) does not allow |
| 305 | + // updates (it returns the existing state if one already exists for the pod's interface), |
| 306 | + // we need to assign all IPs for a pod interface or name+namespace at the same time. |
| 307 | + // |
| 308 | + // iterating over single IPs is not appropriate then, since assignment for the first IP for |
| 309 | + // a pod will prevent the second IP from being added. the following function call transforms |
| 310 | + // pod info indexed by ip address: |
| 311 | + // |
| 312 | + // { |
| 313 | + // "10.0.0.1": podInfo{interface: "aaa-eth0"}, |
| 314 | + // "fe80::1": podInfo{interface: "aaa-eth0"}, |
| 315 | + // } |
| 316 | + // |
| 317 | + // to pod IPs indexed by pod key (interface or name+namespace, depending on scenario): |
| 318 | + // |
| 319 | + // { |
| 320 | + // "aaa-eth0": podIPs{v4IP: 10.0.0.1, v6IP: fe80::1} |
| 321 | + // } |
| 322 | + // |
| 323 | + // such that we can iterate over pod interfaces, and assign all IPs for it at once. |
| 324 | + podKeyToPodIPs, err := newPodKeyToPodIPsMap(podInfoByIP) |
| 325 | + if err != nil { |
| 326 | + logger.Errorf("could not transform pods indexed by IP address to pod IPs indexed by interface: %v", err) |
| 327 | + return types.UnexpectedError |
| 328 | + } |
302 | 329 |
|
303 |
| - ipconfigsRequest := cns.IPConfigsRequest{ |
304 |
| - DesiredIPAddresses: []string{secIPConfig.IPAddress}, |
305 |
| - OrchestratorContext: jsonContext, |
306 |
| - InfraContainerID: podInfo.InfraContainerID(), |
307 |
| - PodInterfaceID: podInfo.InterfaceID(), |
308 |
| - } |
| 330 | + for podKey, podIPs := range podKeyToPodIPs { |
| 331 | + var ( |
| 332 | + desiredIPs []string |
| 333 | + ncIDs []string |
| 334 | + ) |
| 335 | + |
| 336 | + var ips []net.IP |
| 337 | + if podIPs.v4IP != nil { |
| 338 | + ips = append(ips, podIPs.v4IP) |
| 339 | + } |
309 | 340 |
|
310 |
| - if _, err := requestIPConfigsHelper(service, ipconfigsRequest); err != nil { |
311 |
| - logger.Errorf("AllocateIPConfig failed for SecondaryIP %+v, podInfo %+v, ncId %s, error: %v", secIPConfig, podInfo, ncRequest.NetworkContainerid, err) |
312 |
| - return types.FailedToAllocateIPConfig |
| 341 | + if podIPs.v6IP != nil { |
| 342 | + ips = append(ips, podIPs.v6IP) |
| 343 | + } |
| 344 | + |
| 345 | + for _, ip := range ips { |
| 346 | + if ncReq, ok := allSecIPsIdx[ip.String()]; ok { |
| 347 | + logger.Printf("secondary ip %s is assigned to pod %+v, ncId: %s ncVersion: %s", ip, podIPs, ncReq.NetworkContainerid, ncReq.Version) |
| 348 | + desiredIPs = append(desiredIPs, ip.String()) |
| 349 | + ncIDs = append(ncIDs, ncReq.NetworkContainerid) |
| 350 | + } else { |
| 351 | + // it might still be possible to see host networking pods here (where ips are not from ncs) if we are restoring using the kube podinfo provider |
| 352 | + // todo: once kube podinfo provider reconcile flow is removed, this line will not be necessary/should be removed. |
| 353 | + logger.Errorf("ip %s assigned to pod %+v but not found in any nc", ip, podIPs) |
313 | 354 | }
|
314 |
| - } else { |
315 |
| - logger.Printf("SecondaryIP %+v is not assigned. ncId: %s", secIPConfig, ncRequest.NetworkContainerid) |
| 355 | + } |
| 356 | + |
| 357 | + if len(desiredIPs) == 0 { |
| 358 | + // this may happen for pods in the host network |
| 359 | + continue |
| 360 | + } |
| 361 | + |
| 362 | + jsonContext, err := podIPs.OrchestratorContext() |
| 363 | + if err != nil { |
| 364 | + logger.Errorf("Failed to marshal KubernetesPodInfo, error: %v", err) |
| 365 | + return types.UnexpectedError |
| 366 | + } |
| 367 | + |
| 368 | + ipconfigsRequest := cns.IPConfigsRequest{ |
| 369 | + DesiredIPAddresses: desiredIPs, |
| 370 | + OrchestratorContext: jsonContext, |
| 371 | + InfraContainerID: podIPs.InfraContainerID(), |
| 372 | + PodInterfaceID: podIPs.InterfaceID(), |
| 373 | + } |
| 374 | + |
| 375 | + if _, err := requestIPConfigsHelper(service, ipconfigsRequest); err != nil { |
| 376 | + logger.Errorf("requestIPConfigsHelper failed for pod key %s, podInfo %+v, ncIds %v, error: %v", podKey, podIPs, ncIDs, err) |
| 377 | + return types.FailedToAllocateIPConfig |
316 | 378 | }
|
317 | 379 | }
|
318 | 380 |
|
319 |
| - err := service.MarkExistingIPsAsPendingRelease(nnc.Spec.IPsNotInUse) |
320 |
| - if err != nil { |
| 381 | + if err := service.MarkExistingIPsAsPendingRelease(nnc.Spec.IPsNotInUse); err != nil { |
321 | 382 | logger.Errorf("[Azure CNS] Error. Failed to mark IPs as pending %v", nnc.Spec.IPsNotInUse)
|
322 | 383 | return types.UnexpectedError
|
323 | 384 | }
|
324 | 385 |
|
325 | 386 | return 0
|
326 | 387 | }
|
327 | 388 |
|
| 389 | +var ( |
| 390 | + errIPParse = errors.New("parse IP") |
| 391 | + errMultipleIPPerFamily = errors.New("multiple IPs per family") |
| 392 | +) |
| 393 | + |
| 394 | +// newPodKeyToPodIPsMap groups IPs by interface id and returns them indexed by interface id. |
| 395 | +func newPodKeyToPodIPsMap(podInfoByIP map[string]cns.PodInfo) (map[string]podIPs, error) { |
| 396 | + podKeyToPodIPs := make(map[string]podIPs) |
| 397 | + |
| 398 | + for ipStr, podInfo := range podInfoByIP { |
| 399 | + id := podInfo.Key() |
| 400 | + |
| 401 | + ips, ok := podKeyToPodIPs[id] |
| 402 | + if !ok { |
| 403 | + ips.PodInfo = podInfo |
| 404 | + } |
| 405 | + |
| 406 | + ip := net.ParseIP(ipStr) |
| 407 | + switch { |
| 408 | + case ip == nil: |
| 409 | + return nil, errors.Wrapf(errIPParse, "could not parse ip string %q on pod %+v", ipStr, podInfo) |
| 410 | + case ip.To4() != nil: |
| 411 | + if ips.v4IP != nil { |
| 412 | + return nil, errors.Wrapf(errMultipleIPPerFamily, "multiple ipv4 addresses (%v, %v) associated to pod %+v", ips.v4IP, ip, podInfo) |
| 413 | + } |
| 414 | + |
| 415 | + ips.v4IP = ip |
| 416 | + case ip.To16() != nil: |
| 417 | + if ips.v6IP != nil { |
| 418 | + return nil, errors.Wrapf(errMultipleIPPerFamily, "multiple ipv6 addresses (%v, %v) associated to pod %+v", ips.v6IP, ip, podInfo) |
| 419 | + } |
| 420 | + |
| 421 | + ips.v6IP = ip |
| 422 | + } |
| 423 | + |
| 424 | + podKeyToPodIPs[id] = ips |
| 425 | + } |
| 426 | + |
| 427 | + return podKeyToPodIPs, nil |
| 428 | +} |
| 429 | + |
| 430 | +// podIPs are all the IPs associated with a pod, along with pod info |
| 431 | +type podIPs struct { |
| 432 | + cns.PodInfo |
| 433 | + v4IP net.IP |
| 434 | + v6IP net.IP |
| 435 | +} |
| 436 | + |
328 | 437 | // GetNetworkContainerInternal gets network container details.
|
329 | 438 | func (service *HTTPRestService) GetNetworkContainerInternal(
|
330 | 439 | req cns.GetNetworkContainerRequest,
|
|
0 commit comments