Skip to content

Commit

Permalink
Merge pull request #658 from Netcracker/bugfix/calico_typha_allinone
Browse files Browse the repository at this point in the history
[CPDEV-98973] Scale Typha replicas to 1 in All-in-One
  • Loading branch information
koryaga authored May 22, 2024
2 parents dd89dfc + 8d44738 commit 62b39a3
Show file tree
Hide file tree
Showing 9 changed files with 231 additions and 31 deletions.
2 changes: 2 additions & 0 deletions ci/extended_cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ plugins:
install: true
apiserver:
enabled: true
typha:
enabled: true

rbac:
accounts:
Expand Down
2 changes: 2 additions & 0 deletions kubemarine/patches/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
from typing import List

from kubemarine.core.patch import Patch
from kubemarine.patches.p1_calico_typha_schedule_control_planes import CalicoTyphaScheduleControlPlane

patches: List[Patch] = [
CalicoTyphaScheduleControlPlane(),
]
"""
List of patches that is sorted according to the Patch.priority() before execution.
Expand Down
66 changes: 66 additions & 0 deletions kubemarine/patches/p1_calico_typha_schedule_control_planes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from textwrap import dedent
from typing import cast, Optional

from kubemarine import plugins
from kubemarine.core import utils
from kubemarine.core.action import Action
from kubemarine.core.patch import RegularPatch
from kubemarine.core.resources import DynamicResources
from kubemarine.kubernetes import deployment
from kubemarine.plugins import builtin, manifest, calico


class TheAction(Action):
def __init__(self) -> None:
super().__init__("Schedule Calico Typha on control-planes")

def run(self, res: DynamicResources) -> None:
cluster = res.cluster()
logger = cluster.log

calico_version = cluster.inventory['plugins']['calico']['version']
if utils.version_key(calico_version)[0:2] >= utils.minor_version_key("v3.27"):
return logger.info("The patch is not relevant for Calico >= v3.27.x.")

if not calico.is_typha_enabled(cluster.inventory):
return logger.info("The patch is skipped as Calico Typha is not enabled.")

processor = cast(Optional[calico.CalicoLess_3_27_ManifestProcessor],
builtin.get_manifest_processor(cluster, manifest.Identity('calico')))
if processor is None:
return logger.warning("Calico manifest is not installed using default procedure. The patch is skipped.")

original_manifest = processor.original_manifest()
if not processor.get_typha_schedule_control_plane_extra_tolerations(original_manifest):
return logger.info("Necessary tolerations already exist in the original manifest. The patch is skipped.")

manifest_ = processor.enrich()
key = "Deployment_calico-typha"
typha_deployment_yaml = manifest_.get_obj(key, patch=False)

typha_deployment = deployment.Deployment(cluster, 'calico-typha', 'kube-system', typha_deployment_yaml)
logger.debug("Apply patched 'calico-typha' deployment")
typha_deployment.apply(cluster.nodes['control-plane'].get_first_member())

logger.debug("Expect 'calico-typha' deployment and pods")
plugins.expect_deployment(cluster, [{'name': 'calico-typha', 'namespace': 'kube-system'}])
plugins.expect_pods(cluster, ['calico-typha'], namespace='kube-system')


class CalicoTyphaScheduleControlPlane(RegularPatch):
def __init__(self) -> None:
super().__init__("calico_typha_schedule_control_planes")

@property
def action(self) -> Action:
return TheAction()

@property
def description(self) -> str:
return dedent(
f"""\
Allow to schedule Calico Typha pods on control-planes.
This effectively resolves https://github.com/projectcalico/calico/pull/7979 in older versions.
""".rstrip()
)
4 changes: 2 additions & 2 deletions kubemarine/plugins/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
from kubemarine.core import log
from kubemarine.core.cluster import KubernetesCluster, EnrichmentStage, enrichment
from kubemarine.plugins import calico, manifest
from kubemarine.plugins.calico import CalicoManifestProcessor, CalicoApiServerManifestProcessor
from kubemarine.plugins.calico import get_calico_manifest_processor, CalicoApiServerManifestProcessor
from kubemarine.plugins.kubernetes_dashboard import get_dashboard_manifest_processor
from kubemarine.plugins.local_path_provisioner import LocalPathProvisionerManifestProcessor
from kubemarine.plugins.manifest import Identity
from kubemarine.plugins.nginx_ingress import get_ingress_nginx_manifest_processor

MANIFEST_PROCESSOR_PROVIDERS: Dict[Identity, manifest.PROCESSOR_PROVIDER] = {
Identity("calico"): CalicoManifestProcessor,
Identity("calico"): get_calico_manifest_processor,
Identity("calico", "apiserver"): CalicoApiServerManifestProcessor,
Identity("nginx-ingress-controller"): get_ingress_nginx_manifest_processor,
Identity("kubernetes-dashboard"): get_dashboard_manifest_processor,
Expand Down
39 changes: 38 additions & 1 deletion kubemarine/plugins/calico.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,13 +336,17 @@ def enrich_deployment_calico_typha(self, manifest: Manifest) -> None:
self.log.verbose(f"The {key} has been patched in 'spec.replicas' with '{val}'")

self.enrich_node_selector(manifest, key, plugin_service='typha')
self.enrich_tolerations(manifest, key, plugin_service='typha', extra_tolerations=default_tolerations)
self.enrich_deployment_calico_typha_tolerations(manifest, default_tolerations)
self.enrich_image_for_container(manifest, key,
plugin_service='typha', container_name='calico-typha', is_init_container=False)
self.enrich_deployment_calico_typha_container_env(manifest)
self.enrich_resources_for_container(manifest, key,
plugin_service='typha', container_name='calico-typha')

def enrich_deployment_calico_typha_tolerations(self, manifest: Manifest, default_tolerations: List[dict]) -> None:
key = "Deployment_calico-typha"
self.enrich_tolerations(manifest, key, plugin_service='typha', extra_tolerations=default_tolerations)

def enrich_deployment_calico_typha_container_env(self, manifest: Manifest) -> None:
key = "Deployment_calico-typha"
env_ensure: Dict[str, str] = {
Expand Down Expand Up @@ -463,6 +467,28 @@ def get_enrichment_functions(self) -> List[EnrichmentFunction]:
self.enrich_metrics,
]


class CalicoLess_3_27_ManifestProcessor(CalicoManifestProcessor):
def enrich_deployment_calico_typha_tolerations(self, manifest: Manifest, default_tolerations: List[dict]) -> None:
# "backport" issue https://github.com/projectcalico/calico/pull/7979
# to allow Typha to be scheduled on control-planes
for toleration in self.get_typha_schedule_control_plane_extra_tolerations(manifest):
default_tolerations.insert(0, toleration)

super().enrich_deployment_calico_typha_tolerations(manifest, default_tolerations)

def get_typha_schedule_control_plane_extra_tolerations(self, manifest: Manifest) -> List[dict]:
key = "Deployment_calico-typha"
source_yaml = manifest.get_obj(key, patch=True)

schedule_control_plane = [{'effect': 'NoExecute', 'operator': 'Exists'},
{'effect': 'NoSchedule', 'operator': 'Exists'}]

original_tolerations: List[dict] = source_yaml['spec']['template']['spec'].get('tolerations', [])
return [toleration for toleration in schedule_control_plane
if toleration not in original_tolerations]


service_account_secret_calico_node = dedent("""\
apiVersion: v1
kind: Secret
Expand All @@ -484,6 +510,17 @@ def get_enrichment_functions(self) -> List[EnrichmentFunction]:
type: kubernetes.io/service-account-token
""")


def get_calico_manifest_processor(logger: log.VerboseLogger, inventory: dict,
yaml_path: Optional[str] = None, destination: Optional[str] = None) -> Processor:
version: str = inventory['plugins']['calico']['version']
kwargs = {'original_yaml_path': yaml_path, 'destination_name': destination}
if utils.version_key(version)[0:2] < utils.minor_version_key("v3.27"):
return CalicoLess_3_27_ManifestProcessor(logger, inventory, **kwargs)

return CalicoManifestProcessor(logger, inventory, **kwargs)


class CalicoApiServerManifestProcessor(Processor):
def __init__(self, logger: log.VerboseLogger, inventory: dict,
original_yaml_path: Optional[str] = None, destination_name: Optional[str] = None):
Expand Down
14 changes: 9 additions & 5 deletions kubemarine/plugins/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,19 +238,23 @@ def validate_original(self, manifest: Manifest) -> None:
self.log.verbose(f"The current version of original yaml does not include "
f"the following object: {key}")

def enrich(self) -> Manifest:
def original_manifest(self) -> Manifest:
"""
The method implements full processing for the plugin main manifest.
get original YAML and parse it into list of objects
"""

# get original YAML and parse it into list of objects
try:
with utils.open_utf8(self.manifest_path, 'r') as stream:
manifest = Manifest(self.manifest_identity, stream)
return Manifest(self.manifest_identity, stream)
except Exception as exc:
raise Exception(f"Failed to load {self.manifest_identity.repr_id()} from {self.manifest_path} "
f"for {self.plugin_name!r} plugin") from exc

def enrich(self) -> Manifest:
"""
The method implements full processing for the plugin main manifest.
"""

manifest = self.original_manifest()
self.validate_original(manifest)

# call enrichment functions one by one
Expand Down
10 changes: 8 additions & 2 deletions kubemarine/resources/configurations/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -617,8 +617,14 @@ plugins:
typha:
# enabled by default for envs with nodes > 3
enabled: '{% if (nodes | select("has_roles", ["control-plane", "worker"]) | list | length) < 4 %}false{% else %}true{% endif %}'
# let's start from 2 replicas and increment it every 50 nodes
replicas: '{{ (((nodes | select("has_roles", ["control-plane", "worker"]) | list | length) / 50) + 2) | round(0, "floor") | int }}'
# If Typha is disabled, set 0 replicas to avoid sudden configuration changes during add/remove nodes.
# If enabled, Let's start from 2 replicas and increment it every 50 nodes.
# In special case of 1 node, scale to 1 replica.
replicas: "\
{% if plugins.calico.typha.enabled | is_true %}\
{% set kubernetes_nodes = nodes | select('has_roles', ['control-plane', 'worker']) | list | length %}\
{{ (1 + ([kubernetes_nodes - 1, 1] | min) + kubernetes_nodes / 50) | round(0, 'floor') | int }}\
{% else %}0{% endif %}"
image: 'calico/typha:{{ plugins.calico.version }}'
nodeSelector:
kubernetes.io/os: linux
Expand Down
121 changes: 104 additions & 17 deletions test/unit/plugins/test_calico.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,20 +233,25 @@ def test_deployment_calico_typha(self):
cluster = demo.new_cluster(inventory)
manifest = self.enrich_yaml(cluster)
target_yaml = self.get_obj(manifest, "Deployment_calico-typha")
self.assertEqual(2, target_yaml['spec']['replicas'], "Unexpected number of typha replicas")
self.assertEqual(1, target_yaml['spec']['replicas'], "Unexpected number of typha replicas")

template_spec = target_yaml['spec']['template']['spec']
container = self._get_calico_typha_container(manifest)
expected_image = f"example.registry/calico/typha:{self.expected_image_tag(k8s_version, 'version')}"
self.assertEqual(expected_image, container['image'], "Unexpected calico-typha image")
self.assertEqual({"kubernetes.io/os": "something"}, template_spec['nodeSelector'],
"Unexpected calico-typha nodeSelector")
self.assertIn({'key': 'node.kubernetes.io/network-unavailable', 'effect': 'NoSchedule'},
template_spec['tolerations'],
"Default calico-typha toleration is not present")
self.assertIn({'key': 'node.kubernetes.io/network-unavailable', 'effect': 'NoExecute'},
template_spec['tolerations'],
"Default calico-typha toleration is not present")

default_tolerations = [
{'key': 'node.kubernetes.io/network-unavailable', 'effect': 'NoSchedule'},
{'key': 'node.kubernetes.io/network-unavailable', 'effect': 'NoExecute'},
{'effect': 'NoExecute', 'operator': 'Exists'},
{'effect': 'NoSchedule', 'operator': 'Exists'},
]
for toleration in default_tolerations:
self.assertEqual(1, sum(1 for t in template_spec['tolerations'] if t == toleration),
"Default calico-typha toleration is not present")

self.assertIn({"key": 'something', "effect": "NoSchedule"}, template_spec['tolerations'],
"Custom calico-typha toleration is not present")

Expand Down Expand Up @@ -431,19 +436,39 @@ def get_default_expect_config(typha_enabled: bool) -> dict:


class EnrichmentTest(unittest.TestCase):
def test_expect_typha_default(self):
for nodes in (3, 4):
def test_default_typha_enrichment(self):
for nodes in (1, 3, 4, 49, 50):
with self.subTest(f"Kubernetes nodes: {nodes}"):
scheme = {'master': [], 'worker': []}
for i in range(nodes):
scheme['master'].append(f'master-{i+1}')
scheme['worker'].append(f'master-{i+1}')

inventory = demo.generate_inventory(**scheme)
inventory.setdefault('plugins', {})['calico'] = {
'install': True,
inventory = self._inventory(nodes)
cluster = demo.new_cluster(inventory)

typha = cluster.inventory['plugins']['calico']['typha']

expected_enabled = nodes > 3
self.assertEqual(expected_enabled, typha['enabled'])

expected_replicas = 0 if nodes <= 3 else 2 if 3 < nodes < 50 else 3
self.assertEqual(expected_replicas, typha['replicas'])

def test_replicas_default_typha_enabled(self):
for nodes in (1, 2, 49, 50):
with self.subTest(f"Kubernetes nodes: {nodes}"):
inventory = self._inventory(nodes)
inventory['plugins']['calico']['typha'] = {
'enabled': True
}
cluster = demo.new_cluster(inventory)

typha = cluster.inventory['plugins']['calico']['typha']
self.assertEqual(True, typha['enabled'])

expected_replicas = 1 if nodes == 1 else 2 if 1 < nodes < 50 else 3
self.assertEqual(expected_replicas, typha['replicas'])

def test_expect_typha_default(self):
for nodes in (3, 4):
with self.subTest(f"Kubernetes nodes: {nodes}"):
inventory = self._inventory(nodes)
cluster = demo.new_cluster(inventory)

expected_expect_step = get_default_expect_config(nodes > 3)
Expand All @@ -453,6 +478,20 @@ def test_expect_typha_default(self):
self.assertEqual([expected_expect_step, expected_expect_step], actual_expect_steps,
"Unexpected expect procedures")

@staticmethod
def _inventory(nodes: int) -> dict:
scheme = {'master': [], 'worker': []}
for i in range(nodes):
scheme['master'].append(f'master-{i + 1}')
scheme['worker'].append(f'master-{i + 1}')

inventory = demo.generate_inventory(**scheme)
inventory.setdefault('plugins', {})['calico'] = {
'install': True,
}

return inventory


class RedeployIfNeeded(unittest.TestCase):
def prepare_context(self, procedure: str):
Expand Down Expand Up @@ -581,6 +620,54 @@ def test_add_remove_balancer_redeploy_not_needed(self):

self._run_and_check(False)

def test_add_remove_second_kubernetes_node_redeploy_not_needed(self):
# pylint: disable=attribute-defined-outside-init

for role in ('master', 'worker'):
with self.subTest(f'Role: {role}'):
scheme = {'balancer': 1, 'master': ['master-1'], 'worker': ['master-1']}
add_node_name = 'node-1'
if role == 'master':
scheme['master'].append(add_node_name)
else:
scheme['worker'].append(add_node_name)

self.prepare_context('add_node')
self.prepare_inventory(scheme, 'add_node', add_node_name)

res = self._run_and_check(False)

self.inventory = res.inventory()
self.prepare_context('remove_node')

self._run_and_check(False)

def test_add_remove_second_kubernetes_node_typha_enabled_redeploy_needed(self):
# pylint: disable=attribute-defined-outside-init

for role in ('master', 'worker'):
for typha_replicas_redefined in (False, True):
with self.subTest(f'Role: {role}, Typha replicas redefined: {typha_replicas_redefined}'):
scheme = {'balancer': 1, 'master': ['master-1'], 'worker': ['master-1']}
add_node_name = 'node-1'
if role == 'master':
scheme['master'].append(add_node_name)
else:
scheme['worker'].append(add_node_name)

self.prepare_context('add_node')
self.prepare_inventory(scheme, 'add_node', add_node_name)
self.inventory['plugins']['calico']['typha']['enabled'] = True
if typha_replicas_redefined:
self.inventory['plugins']['calico']['typha']['replicas'] = 2

res = self._run_and_check(not typha_replicas_redefined)

self.inventory = res.inventory()
self.prepare_context('remove_node')

self._run_and_check(not typha_replicas_redefined)

def test_add_remove_50th_kubernetes_node_redeploy_needed(self):
# pylint: disable=attribute-defined-outside-init

Expand Down
4 changes: 0 additions & 4 deletions test/unit/test_defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,6 @@ def test_default_enrichment(self):
['plugins."io.containerd.grpc.v1.cri".containerd.runtimes.runc.options']['SystemdCgroup'])
self.assertNotIn('min', inventory['services']['kubeadm_kube-proxy']['conntrack'])

typha = inventory['plugins']['calico']['typha']
self.assertEqual(False, typha['enabled'])
self.assertEqual(2, typha['replicas'])

nginx_ingress_ports = inventory['plugins']['nginx-ingress-controller']['ports']
self.assertEqual(20080, [port for port in nginx_ingress_ports if port['name'] == 'http'][0]['hostPort'])
self.assertEqual(20443, [port for port in nginx_ingress_ports if port['name'] == 'https'][0]['hostPort'])
Expand Down

0 comments on commit 62b39a3

Please sign in to comment.