diff --git a/documentation/Installation.md b/documentation/Installation.md index 20a3c72d8..54c336c95 100644 --- a/documentation/Installation.md +++ b/documentation/Installation.md @@ -1901,7 +1901,11 @@ The following associations are used by default: **Notes**: * By default, the packages' versions are installed according to the Kubernetes version specified in the [Supported versions](#supported-versions) section. -* In the procedure for adding nodes, the package versions are taken from the current nodes in order to match the nodes in the cluster. For example, if `containerd.io-1.6.4-1` is installed on the nodes of the cluster, this version is installed on the new node. This behavior can be changed by setting the `cache_versions` option to `false`. The package versions are then used only with the template from the `associations` section. +* In the procedure for adding nodes, the package versions are taken from the current nodes to match the nodes in the cluster. + For example, if `containerd.io-1.6.4-1` is installed on the nodes of the cluster, this version is installed on the new node. + This behavior can be changed by setting the `cache_versions` option to "false". + The package versions are then used only with the template from the `associations` section. + The option can be used both in global `services.packages` and in specific associations sections. The following is an example of overriding docker associations: @@ -1911,6 +1915,7 @@ services: cache_versions: true associations: docker: + cache_versions: false executable_name: 'docker' package_name: - docker-ce-19* @@ -4880,6 +4885,10 @@ After any procedure is completed, a final inventory with all the missing variabl This inventory can be found in the `cluster_finalized.yaml` file in the working directory, and can be passed as a source inventory in future runs of KubeMarine procedures. +**Note**: The `cluster_finalized.yaml` inventory file is aimed to reflect the current cluster state together with the KubeMarine version using which it is created. +This in particular means that the file cannot be directly used with a different KubeMarine version. +Though, it still can be migrated together with the managed cluster using the [Kubemarine Migration Procedure](/documentation/Maintenance.md#kubemarine-migration-procedure). + In the file, you can see not only the compiled inventory, but also some converted values depending on what is installed on the cluster. For example, consider the following package's origin configuration: @@ -4904,20 +4913,21 @@ services: - policycoreutils-python-utils ``` -The above configuration is converted to the following finalized configuration: +The above configuration is converted to the following finalized configuration, provided that the cluster is based on RHEL nodes: ```yaml services: packages: associations: - docker: - executable_name: 'docker' - package_name: - - docker-ce-19.03.15-3.el7.x86_64 - - docker-ce-cli-19.03.15-3.el7.x86_64 - - containerd.io-1.4.6-3.1.el7.x86_64 - service_name: 'docker' - config_location: '/etc/docker/daemon.json' + rhel: + docker: + executable_name: 'docker' + package_name: + - docker-ce-19.03.15-3.el7.x86_64 + - docker-ce-cli-19.03.15-3.el7.x86_64 + - containerd.io-1.4.6-3.1.el7.x86_64 + service_name: 'docker' + config_location: '/etc/docker/daemon.json' install: include: - conntrack @@ -4929,6 +4939,11 @@ services: ``` **Note**: Some of the packages are impossible to be detected in the system, therefore such packages remain unchanged. +The same rule is applied if two different package versions are detected on different nodes. +Also, see the `cache_versions` option in the [associations](#associations) section. + +**Note**: After some time is passed, the detected package versions might disappear from the repository. +Direct using of the `cluster_finalized.yaml` file in procedures like `install` or `add_node` might be impossible due to this reason, and would require a manual intervention. The same applies to the VRRP interfaces. For example, the following origin configuration without interfaces: diff --git a/kubemarine/apparmor.py b/kubemarine/apparmor.py index 1ffbbbdb6..0ec021af3 100644 --- a/kubemarine/apparmor.py +++ b/kubemarine/apparmor.py @@ -112,7 +112,7 @@ def configure_apparmor(group, expected_profiles): def setup_apparmor(group): log = group.cluster.log - if group.get_nodes_os(suppress_exceptions=True) != 'debian': + if group.get_nodes_os() != 'debian': log.debug("Skipped - Apparmor is supported only on Ubuntu/Debian") return diff --git a/kubemarine/audit.py b/kubemarine/audit.py index b077fa722..cadc18409 100644 --- a/kubemarine/audit.py +++ b/kubemarine/audit.py @@ -54,7 +54,7 @@ def install(group: NodeGroup, enable_service: bool = True, force: bool = False) return # This method handles cluster with multiple os, exceptions should be suppressed - if not force and group.get_nodes_os(suppress_exceptions=True) in ['rhel', 'rhel8']: + if not force and group.get_nodes_os() in ['rhel', 'rhel8']: log.debug('Auditd installation is not required on RHEL nodes') return diff --git a/kubemarine/core/cluster.py b/kubemarine/core/cluster.py index 335faf107..805ec9f14 100755 --- a/kubemarine/core/cluster.py +++ b/kubemarine/core/cluster.py @@ -14,7 +14,7 @@ import re from copy import deepcopy -from typing import Dict, List, Union +from typing import Dict, List, Union, Iterable, Tuple import fabric import yaml @@ -89,26 +89,32 @@ def make_group(self, ips: List[_AnyConnectionTypes]) -> NodeGroup: raise Exception('Unsupported connection object type') return NodeGroup(connections, self) - def get_addresses_from_node_names(self, node_names: List[str]) -> dict: - result = {} + def get_access_address_from_node(self, node: dict): + """ + Returns address which should be used to connect to the node via Fabric. + The address also can be used as unique identifier of the node. + """ + address = node.get('connect_to') + if address is None: + address = node.get('address') + if address is None: + address = node.get('internal_address') + + return address + + def get_addresses_from_node_names(self, node_names: List[str]) -> List[str]: + result = [] for node in self.inventory["nodes"]: for requested_node_name in node_names: if requested_node_name == node['name']: - result[node['name']] = { - 'address': node.get('address'), - 'internal_address': node.get('internal_address'), - 'connect_to': node.get('connect_to') - } + result.append(self.get_access_address_from_node(node)) return result def get_node(self, host: Union[str, fabric.connection.Connection]) -> dict: return self.make_group([host]).get_first_member(provide_node_configs=True) def make_group_from_nodes(self, node_names: List[str]) -> NodeGroup: - addresses = self.get_addresses_from_node_names(node_names) - ips = [] - for item in list(addresses.values()): - ips.append(item['connect_to']) + ips = self.get_addresses_from_node_names(node_names) return self.make_group(ips) def create_group_from_groups_nodes_names(self, groups_names: List[str], nodes_names: List[str]) -> NodeGroup: @@ -139,9 +145,6 @@ def is_task_completed(self, task_path) -> bool: from kubemarine.core import flow return flow.is_task_completed(self, task_path) - def get_final_inventory(self): - return utils.get_final_inventory(self) - def get_facts_enrichment_fns(self): return [ "kubemarine.kubernetes.add_node_enrichment", @@ -177,23 +180,7 @@ def detect_nodes_context(self) -> dict: self.log.verbose('OS family check finished') self.log.debug('Detecting nodes context finished!') - return {k: deepcopy(self.context[k]) for k in ('nodes', 'os')} - - def _gather_facts_after(self): - self.log.debug('Gathering facts after tasks execution started...') - - self.remove_invalid_cri_config(self.inventory) - # Method "kubemarine.system.is_multiple_os_detected" is not used because it detects OS family for new nodes - # only, while package versions caching performs on all nodes. - # Cache packages only if it's set in configuration - if self.inventory['services']['packages']['cache_versions']: - if self.nodes['all'].get_accessible_nodes().get_nodes_os(suppress_exceptions=True, force_all_nodes=True) != 'multiple': - self.cache_package_versions() - self.log.verbose('Package versions detection finished') - else: - self.log.verbose('Package versions detection cancelled - cluster in multiple OS state') - - self.log.debug('Gathering facts after tasks execution finished!') + return deepcopy(self.context['nodes']) def _check_online_nodes(self): """ @@ -223,49 +210,113 @@ def _check_accessible_nodes(self): raise Exception(f"{not_accessible_online.get_hosts()} are not accessible through ssh. " f"Check ssh credentials.") - def get_associations_for_os(self, os_family): - package_associations = self.inventory['services']['packages']['associations'] - active_os_family = self.context.get("os") - if active_os_family != os_family: - package_associations = package_associations[os_family] - - return package_associations - def get_os_family_for_node(self, host: str) -> str: node_context = self.context['nodes'].get(host) if not node_context or not node_context.get('os', {}).get('family'): raise Exception('Node %s do not contain necessary context data' % host) return node_context['os']['family'] - def get_associations_for_node(self, host: str) -> dict: + def get_os_family_for_nodes(self, hosts: Iterable[str]) -> str: + """ + Returns the detected operating system family for hosts. + + :return: Detected OS family, possible values: "debian", "rhel", "rhel8", "multiple", "unknown", "unsupported". + """ + os_families = {self.get_os_family_for_node(host) for host in hosts} + if len(os_families) > 1: + return 'multiple' + elif len(os_families) == 0: + raise Exception('Cannot get os family for empty nodes list') + else: + return list(os_families)[0] + + def get_os_family(self) -> str: + """ + Returns common OS family name from all final remote hosts. + The method can be used during enrichment when NodeGroups are not yet calculated. + + :return: Detected OS family, possible values: "debian", "rhel", "rhel8", "multiple", "unknown", "unsupported". + """ + hosts_detect_os_family = [] + for node in self.inventory['nodes']: + host = self.get_access_address_from_node(node) + if 'remove_node' not in node['roles']: + hosts_detect_os_family.append(host) + + return self.get_os_family_for_nodes(hosts_detect_os_family) + + def get_os_identifiers(self) -> Dict[str, Tuple[str, str]]: """ - Returns all packages associations for specific node + For each final node of the cluster, returns a tuple of OS (family, version). + """ + nodes_check_os = self.nodes['all'].get_final_nodes() + os_ids = {} + for host in nodes_check_os.get_hosts(): + os_details = self.context['nodes'][host]['os'] + os_ids[host] = (os_details['family'], os_details['version']) + + return os_ids + + def _get_associations_for_os(self, os_family: str, package: str) -> dict: + if os_family in ('unknown', 'unsupported', 'multiple'): + raise Exception("Failed to get associations for unsupported or multiple OS families") + + associations = self.inventory['services']['packages']['associations'][os_family].get(package) + if associations is None: + raise Exception(f'Failed to get associations for package "{package}"') + + return associations + + def get_associations_for_node(self, host: str, package: str) -> dict: + """ + Returns all packages associations for specific node. + :param host: The address of the node for which required to find the associations + :param package: The package name to get the associations for :return: Dict with packages and their associations """ node_os_family = self.get_os_family_for_node(host) - return self.get_associations_for_os(node_os_family) + return self._get_associations_for_os(node_os_family, package) + + def _get_package_associations_for_os(self, os_family: str, package: str, association_key: str) -> str or list: + associations = self._get_associations_for_os(os_family, package) + association_value = associations.get(association_key) + if association_value is None: + raise Exception(f'Failed to get association "{association_key}" for package "{package}"') + if not isinstance(association_value, str) and not isinstance(association_value, list): + raise Exception(f'Unsupported association "{association_key}" value type for package "{package}", ' + f'got: {str(association_value)}') + + return association_value + + def get_package_association(self, package: str, association_key: str) -> str or list: + """ + Returns the specified association for the specified package from inventory for the cluster. + The method can be used only if cluster has nodes with the same and supported OS family. + + :param package: The package name to get the association for + :param association_key: Association key to get + :return: Association string or list value + """ + os_family = self.get_os_family() + return self._get_package_associations_for_os(os_family, package, association_key) def get_package_association_for_node(self, host: str, package: str, association_key: str) -> str or list: """ - Returns the specified association for the specified package from inventory for specific node + Returns the specified association for the specified package from inventory for specific node. + :param host: The address of the node for which required to find the association :param package: The package name to get the association for :param association_key: Association key to get :return: Association string or list value """ - associations = self.get_associations_for_node(host) - association_value = associations.get(package, {}).get(association_key) - if association_value is None: - raise Exception(f'Failed to get association "{association_key}" for package "{package}"') - if not isinstance(association_value, str) and not isinstance(association_value, list): - raise Exception(f'Unsupported association "{association_key}" value type for package "{package}", ' - f'got: {str(association_value)}') - return association_value + os_family = self.get_os_family_for_node(host) + return self._get_package_associations_for_os(os_family, package, association_key) def get_package_association_for_group(self, group: NodeGroup, package: str, association_key: str) -> dict: """ - Returns the specified association dict for the specified package from inventory for entire NodeGroup + Returns the specified association dict for the specified package from inventory for entire NodeGroup. + :param group: NodeGroup for which required to find the association :param package: The package name to get the association for :param association_key: Association key to get @@ -282,6 +333,7 @@ def get_package_association_str_for_group(self, group: NodeGroup, """ Returns the specified association string or list for the specified package from inventory for entire NodeGroup. If association value is different between some nodes, an exception will be thrown. + :param group: NodeGroup for which required to find the association :param package: The package name to get the association for :param association_key: Association key to get @@ -293,80 +345,29 @@ def get_package_association_str_for_group(self, group: NodeGroup, return results_values[0] raise Exception(f'Too many values returned for package associations str "{association_key}" for package "{package}"') - def cache_package_versions(self): - # todo consider nodes not having sudo privileges - from kubemarine import packages - detected_packages = packages.detect_installed_packages_version_groups( - self.nodes['all'].get_unchanged_nodes().get_online_nodes(True)) - for os_family in ['debian', 'rhel', 'rhel8']: - if self.inventory['services']['packages']['associations'].get(os_family): - del self.inventory['services']['packages']['associations'][os_family] - for association_name, associated_params in self.inventory['services']['packages']['associations'].items(): - associated_packages = associated_params.get('package_name', []) - packages_list = [] - final_packages_list = [] - if isinstance(associated_packages, str): - packages_list.append(packages.get_package_name(self.nodes['all'].get_final_nodes().get_nodes_os(), associated_packages)) - elif isinstance(associated_packages, list): - associated_packages_clean = [] - for package in associated_packages: - associated_packages_clean.append(packages.get_package_name(self.nodes['all'].get_final_nodes().get_nodes_os(), package)) - packages_list = packages_list + associated_packages_clean - else: - raise Exception('Unsupported associated packages object type') - - for package in packages_list: - detected_package_versions = list(detected_packages[package].keys()) - for version in detected_package_versions: - # add package version to list only if it was found as installed - # skip version, which ended with special symbol = or - - # (it is possible in some cases to receive "containerd=" version) - if "not installed" not in version and version[-1] != '=' and version[-1] != '-': - final_packages_list.append(version) - - # if there no versions detected, then set package version to default - if not final_packages_list: - final_packages_list = [package] - - # if non-multiple value, then convert to simple string - # packages can contain multiple package values, like docker package - # (it has docker-ce, docker-cli and containerd.io packages for installation) - if len(final_packages_list) == 1: - final_packages_list = final_packages_list[0] - else: - final_packages_list = list(set(final_packages_list)) - - associated_params['package_name'] = final_packages_list - # packages from direct installation section - if self.inventory['services']['packages'].get('install', {}): - final_packages_list = [] - for package in self.inventory['services']['packages']['install']['include']: - package_versions_list = [] - if package in self.globals['compatibility_map']['software']: - detected_package_versions = list(detected_packages[package].keys()) - for version in detected_package_versions: - # skip version, which ended with special symbol = or - - # (it is possible in some cases) - if "not installed" not in version and version[-1] != '=' and version[-1] != '-': - # add package version to list only if it was found as installed - package_versions_list.append(version) - # if there no versions detected, then set package version to default - if not package_versions_list: - package_versions_list = [package] - final_packages_list = final_packages_list + package_versions_list - self.inventory['services']['packages']['install']['include'] = list(set(final_packages_list)) - return detected_packages - - def dump_finalized_inventory(self): - self._gather_facts_after() - # TODO: rewrite the following lines as deenrichment functions like common enrichment mechanism + def make_finalized_inventory(self): from kubemarine.core import defaults from kubemarine.procedures import remove_node - from kubemarine import controlplane - prepared_inventory = remove_node.remove_node_finalize_inventory(self, self.inventory) - prepared_inventory = defaults.prepare_for_dump(prepared_inventory, copy=False) - prepared_inventory = self.escape_jinja_characters_for_inventory(prepared_inventory) - inventory_for_dump = controlplane.controlplane_finalize_inventory(self, prepared_inventory) + from kubemarine import controlplane, cri, packages + + cluster_finalized_functions = { + packages.cache_package_versions, + packages.remove_unused_os_family_associations, + cri.remove_invalid_cri_config, + remove_node.remove_node_finalize_inventory, + defaults.escape_jinja_characters_for_inventory, + controlplane.controlplane_finalize_inventory, + } + + # copying is currently not necessary, but it is possible in general. + prepared_inventory = self.inventory + for finalize_fn in cluster_finalized_functions: + prepared_inventory = finalize_fn(self, prepared_inventory) + + return defaults.prepare_for_dump(prepared_inventory, copy=False) + + def dump_finalized_inventory(self): + inventory_for_dump = self.make_finalized_inventory() data = yaml.dump(inventory_for_dump) finalized_filename = "cluster_finalized.yaml" utils.dump_file(self, data, finalized_filename) @@ -378,34 +379,7 @@ def preserve_inventory(self): cluster_storage = utils.ClusterStorage(self) cluster_storage.make_dir() if self.context.get('initial_procedure') == 'add_node': - cluster_storage.collect_info_all_control_plane() - cluster_storage.upload_info_new_node() + cluster_storage.upload_info_new_control_planes() cluster_storage.collect_procedure_info() cluster_storage.compress_and_upload_archive() cluster_storage.rotation_file() - - def escape_jinja_characters_for_inventory(self, obj): - if isinstance(obj, dict): - for key, value in obj.items(): - obj[key] = self.escape_jinja_characters_for_inventory(value) - elif isinstance(obj, list): - for key, value in enumerate(obj): - obj[key] = self.escape_jinja_characters_for_inventory(value) - elif isinstance(obj, str): - obj = self.escape_jinja_character(obj) - return obj - - def escape_jinja_character(self, value): - if '{{' in value and '}}' in value and re.search(jinja_query_regex, value): - matches = re.findall(jinja_query_regex, value) - for match in matches: - # TODO: rewrite to correct way of match replacement: now it can cause "{raw}{raw}xxx.." circular bug - value = value.replace(match, '{% raw %}'+match+'{% endraw %}') - return value - - def remove_invalid_cri_config(self, inventory): - if inventory['services']['cri']['containerRuntime'] == 'docker': - if inventory['services']['cri'].get('containerdConfig'): - del inventory['services']['cri']['containerdConfig'] - elif inventory['services']['cri'].get('dockerConfig'): - del inventory['services']['cri']['dockerConfig'] diff --git a/kubemarine/core/defaults.py b/kubemarine/core/defaults.py index 9028a0b8c..58efb19f5 100755 --- a/kubemarine/core/defaults.py +++ b/kubemarine/core/defaults.py @@ -22,7 +22,7 @@ from kubemarine.core.cluster import KubernetesCluster from kubemarine.core.errors import KME from kubemarine import jinja -from kubemarine.core import utils +from kubemarine.core import utils, static from kubemarine.core.yaml_merger import default_merger from kubemarine import controlplane @@ -78,9 +78,10 @@ invalid_node_name_regex = re.compile("[^a-z-.\\d]", re.M) escaped_expression_regex = re.compile('({%[\\s*|]raw[\\s*|]%}.*?{%[\\s*|]endraw[\\s*|]%})', re.M) +jinja_query_regex = re.compile("{{ .* }}", re.M) -def apply_defaults(inventory, cluster): +def apply_defaults(inventory, cluster: KubernetesCluster): recursive_apply_defaults(supported_defaults, inventory) for i, node in enumerate(inventory["nodes"]): @@ -93,11 +94,7 @@ def apply_defaults(inventory, cluster): raise Exception('Node name \"%s\" contains invalid characters. A DNS-1123 subdomain must consist of lower ' 'case alphanumeric characters, \'-\' or \'.\'' % node_name) - address = node.get('connect_to') - if address is None: - address = node.get('address') - if address is None: - address = node.get('internal_address') + address = cluster.get_access_address_from_node(node) if address is None: raise Exception('Node %s do not have any address' % node_name) @@ -415,7 +412,7 @@ def calculate_nodegroups(inventory, cluster): return inventory -def enrich_inventory(cluster, custom_inventory, apply_fns=True, make_dumps=True, custom_fns=None): +def enrich_inventory(cluster, custom_inventory, make_dumps=True, custom_fns=None): with open(utils.get_resource_absolute_path('resources/configurations/defaults.yaml', script_relative=True), 'r') as stream: base_inventory = yaml.safe_load(stream) @@ -424,23 +421,22 @@ def enrich_inventory(cluster, custom_inventory, apply_fns=True, make_dumps=True, # it is necessary to temporary put half-compiled inventory to cluster inventory field cluster._inventory = inventory - if apply_fns: - if custom_fns: - enrichment_functions = custom_fns - else: - enrichment_functions = DEFAULT_ENRICHMENT_FNS + if custom_fns: + enrichment_functions = custom_fns + else: + enrichment_functions = DEFAULT_ENRICHMENT_FNS - # run required fields calculation - for enrichment_fn in enrichment_functions: - fn_package_name, fn_method_name = enrichment_fn.rsplit('.', 1) - mod = import_module(fn_package_name) - cluster.log.verbose('Calling fn "%s"' % enrichment_fn) - inventory = getattr(mod, fn_method_name)(inventory, cluster) + # run required fields calculation + for enrichment_fn in enrichment_functions: + fn_package_name, fn_method_name = enrichment_fn.rsplit('.', 1) + mod = import_module(fn_package_name) + cluster.log.verbose('Calling fn "%s"' % enrichment_fn) + inventory = getattr(mod, fn_method_name)(inventory, cluster) cluster.log.verbose('Enrichment finished!') if make_dumps: - inventory_for_dump = controlplane.controlplane_finalize_inventory(cluster, prepare_for_dump((inventory))) + inventory_for_dump = controlplane.controlplane_finalize_inventory(cluster, prepare_for_dump(inventory)) utils.dump_file(cluster, yaml.dump(inventory_for_dump, ), "cluster.yaml") return inventory @@ -451,7 +447,7 @@ def compile_inventory(inventory, cluster): # convert references in yaml to normal values iterations = 100 root = deepcopy(inventory) - root['globals'] = cluster.globals + root['globals'] = static.GLOBALS while iterations > 0: @@ -514,6 +510,27 @@ def compile_string(log, struct, root, ignore_jinja_escapes=True): return struct +def escape_jinja_characters_for_inventory(cluster: KubernetesCluster, obj): + if isinstance(obj, dict): + for key, value in obj.items(): + obj[key] = escape_jinja_characters_for_inventory(cluster, value) + elif isinstance(obj, list): + for key, value in enumerate(obj): + obj[key] = escape_jinja_characters_for_inventory(cluster, value) + elif isinstance(obj, str): + obj = _escape_jinja_character(obj) + return obj + + +def _escape_jinja_character(value): + if '{{' in value and '}}' in value and re.search(jinja_query_regex, value): + matches = re.findall(jinja_query_regex, value) + for match in matches: + # TODO: rewrite to correct way of match replacement: now it can cause "{raw}{raw}xxx.." circular bug + value = value.replace(match, '{% raw %}'+match+'{% endraw %}') + return value + + def prepare_for_dump(inventory, copy=True): # preparation for dump required to remove memory links diff --git a/kubemarine/core/group.py b/kubemarine/core/group.py index 0bd452d04..44ac34970 100755 --- a/kubemarine/core/group.py +++ b/kubemarine/core/group.py @@ -879,6 +879,12 @@ def get_hosts(self) -> List[str]: members = self.get_ordered_members_list(provide_node_configs=True) return [node['connect_to'] for node in members] + def get_host(self): + if len(self.nodes) != 1: + raise Exception("Cannot get the only host from not a single node") + + return list(self.nodes.keys())[0] + def is_empty(self) -> bool: return not self.nodes @@ -922,38 +928,20 @@ def nodes_amount(self) -> int: """ return len(self.nodes.keys()) - def get_nodes_os(self, suppress_exceptions: bool = False, force_all_nodes: bool = False) -> str: - """ - Returns the detected operating system family for group. If the families are different within the same group, or - the family is unknown, the exception will be thrown or a string result will be returned. - :param suppress_exceptions: Flag, deactivating exception. A string value "unknown" or "multiple" will be - returned instead. - :param force_all_nodes: A flag that forces OS detection from all nodes, not just new ones. - :return: Detected OS family, possible values: "debian", "rhel", "rhel8", "multiple", "unknown". - """ - detected_os_family = None - group = self - if not force_all_nodes: - group = self.get_new_nodes_or_self() - for node in group.get_ordered_members_list(provide_node_configs=True): - os_family = self.cluster.context["nodes"][node['connect_to']]["os"]['family'] - if os_family == 'unknown' and not suppress_exceptions: - raise Exception('OS family is unknown') - if not detected_os_family: - detected_os_family = os_family - elif detected_os_family != os_family: - detected_os_family = 'multiple' - if not suppress_exceptions: - raise Exception('OS families differ: detected "%s" and "%s" in same cluster' - % (detected_os_family, os_family)) - return detected_os_family + def get_nodes_os(self) -> str: + """ + Returns the detected operating system family for group. + + :return: Detected OS family, possible values: "debian", "rhel", "rhel8", "multiple", "unknown", "unsupported". + """ + return self.cluster.get_os_family_for_nodes(self.nodes.keys()) def is_multi_os(self) -> bool: """ Returns true if same group contains nodes with multiple OS families :return: Boolean """ - return self.get_nodes_os(suppress_exceptions=True) == 'multiple' + return self.get_nodes_os() == 'multiple' def get_subgroup_with_os(self, os_family: str) -> NodeGroup: """ diff --git a/kubemarine/core/resources.py b/kubemarine/core/resources.py index aa8f99ad3..8f1a764e5 100644 --- a/kubemarine/core/resources.py +++ b/kubemarine/core/resources.py @@ -143,7 +143,7 @@ def create_deviated_cluster(self, deviated_context: dict): def _create_cluster(self, context): log = self.logger() context = deepcopy(context) - default_merger.merge(context, self._get_nodes_context()) + context['nodes'] = deepcopy(self._get_nodes_context()) try: cluster = self._new_cluster_instance(context) cluster.enrich() diff --git a/kubemarine/core/utils.py b/kubemarine/core/utils.py index d93a3e13f..5a1d96730 100755 --- a/kubemarine/core/utils.py +++ b/kubemarine/core/utils.py @@ -381,27 +381,25 @@ def collect_procedure_info(self): output = yaml.dump(out) dump_file(context, output, "procedure_parameters") - def collect_info_all_control_plane(self): + def upload_info_new_control_planes(self): """ - This method is used to transfer backup logs from the main control-plane to the new control-plane. + This method is used to transfer backup logs from the initial control-plane to the new control-planes. """ + new_control_planes = self.cluster.nodes['control-plane'].get_new_nodes() + if new_control_planes.is_empty(): + return node = self.cluster.nodes['control-plane'].get_initial_nodes().get_first_member(provide_node_configs=True) control_plane = self.cluster.make_group([node['connect_to']]) data_copy_res = control_plane.sudo(f'tar -czvf /tmp/kubemarine-backup.tar.gz {self.dir_path}') - self.cluster.log.debug('Backup created:\n%s' % data_copy_res) + self.cluster.log.verbose('Backup created:\n%s' % data_copy_res) control_plane.get('/tmp/kubemarine-backup.tar.gz', get_dump_filepath(self.cluster.context, "dump_log_cluster.tar.gz"), 'dump_log_cluster.tar.gz') self.cluster.log.debug('Backup downloaded') - def upload_info_new_node(self): - - new_nodes = self.cluster.nodes['all'].get_new_nodes() - - for new_node in new_nodes.get_ordered_members_list(provide_node_configs=True): + for new_node in new_control_planes.get_ordered_members_list(provide_node_configs=True): group = self.cluster.make_group([new_node['connect_to']]) - if 'control-plane' in new_node['roles']: - group.put(get_dump_filepath(self.cluster.context, "dump_log_cluster.tar.gz"), - "/tmp/dump_log_cluster.tar.gz", sudo=True) - group.sudo(f'tar -C / -xzvf /tmp/dump_log_cluster.tar.gz') + group.put(get_dump_filepath(self.cluster.context, "dump_log_cluster.tar.gz"), + "/tmp/dump_log_cluster.tar.gz", sudo=True) + group.sudo(f'tar -C / -xzvf /tmp/dump_log_cluster.tar.gz') diff --git a/kubemarine/cri/__init__.py b/kubemarine/cri/__init__.py index 467b725f3..170ff8c01 100644 --- a/kubemarine/cri/__init__.py +++ b/kubemarine/cri/__init__.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +from kubemarine.core.cluster import KubernetesCluster from kubemarine.core.group import NodeGroupResult from kubemarine.cri import docker, containerd @@ -39,6 +39,16 @@ def enrich_inventory(inventory, cluster): return inventory +def remove_invalid_cri_config(cluster: KubernetesCluster, inventory: dict): + if inventory['services']['cri']['containerRuntime'] == 'docker': + if inventory['services']['cri'].get('containerdConfig'): + del inventory['services']['cri']['containerdConfig'] + elif inventory['services']['cri'].get('dockerConfig'): + del inventory['services']['cri']['dockerConfig'] + + return inventory + + def install(group): cri_impl = group.cluster.inventory['services']['cri']['containerRuntime'] diff --git a/kubemarine/cri/containerd.py b/kubemarine/cri/containerd.py index 0ebae0951..1e2ddebd2 100755 --- a/kubemarine/cri/containerd.py +++ b/kubemarine/cri/containerd.py @@ -22,12 +22,13 @@ from kubemarine import system, packages from kubemarine.core import utils from kubemarine.core.executor import RemoteExecutor +from kubemarine.core.group import NodeGroup -def install(group): +def install(group: NodeGroup): with RemoteExecutor(group.cluster) as exe: for node in group.get_ordered_members_list(provide_node_configs=True): - os_specific_associations = group.cluster.get_associations_for_node(node['connect_to'])['containerd'] + os_specific_associations = group.cluster.get_associations_for_node(node['connect_to'], 'containerd') group.cluster.log.debug("Installing latest containerd and podman on %s node" % node['name']) # always install latest available containerd and podman @@ -42,7 +43,7 @@ def install(group): return exe.get_last_results_str() -def configure(group): +def configure(group: NodeGroup): log = group.cluster.log log.debug("Uploading crictl configuration for containerd...") @@ -102,7 +103,7 @@ def configure(group): utils.dump_file(group.cluster, config_string, 'containerd-config.toml') with RemoteExecutor(group.cluster) as exe: for node in group.get_ordered_members_list(provide_node_configs=True): - os_specific_associations = group.cluster.get_associations_for_node(node['connect_to'])['containerd'] + os_specific_associations = group.cluster.get_associations_for_node(node['connect_to'], 'containerd') log.debug("Uploading containerd configuration to %s node..." % node['name']) node['connection'].put(StringIO(config_string), os_specific_associations['config_location'], backup=True, sudo=True, mkdir=True) diff --git a/kubemarine/cri/docker.py b/kubemarine/cri/docker.py index 6a79f84d7..74aed30f8 100755 --- a/kubemarine/cri/docker.py +++ b/kubemarine/cri/docker.py @@ -18,12 +18,13 @@ from kubemarine import system, packages from kubemarine.core import utils from kubemarine.core.executor import RemoteExecutor +from kubemarine.core.group import NodeGroup -def install(group): +def install(group: NodeGroup): with RemoteExecutor(group.cluster) as exe: for node in group.get_ordered_members_list(provide_node_configs=True): - os_specific_associations = group.cluster.get_associations_for_node(node['connect_to'])['docker'] + os_specific_associations = group.cluster.get_associations_for_node(node['connect_to'], 'docker') packages.install(node['connection'], include=os_specific_associations['package_name']) enable(node['connection']) @@ -39,19 +40,18 @@ def uninstall(group): return packages.remove(group, include=['docker', 'docker-engine', 'docker.io', 'docker-ce']) -def enable(group): - system.enable_service( - group, - name=group.cluster.inventory['services']['packages']['associations']['docker']['service_name'], now=True) +def enable(group: NodeGroup): + # currently it is invoked only for single node + service_name = group.cluster.get_package_association_for_node(group.get_host(), 'docker', 'service_name') + system.enable_service(group, name=service_name, now=True) -def disable(group): - system.disable_service( - group, - name=group.cluster.inventory['services']['packages']['associations']['docker']['service_name'], now=True) +def disable(group: NodeGroup): + service_name = group.cluster.get_package_association_for_node(group.get_host(), 'docker', 'service_name') + system.disable_service(group, name=service_name, now=True) -def configure(group): +def configure(group: NodeGroup): log = group.cluster.log settings_json = json.dumps(group.cluster.inventory["services"]['cri']['dockerConfig'], sort_keys=True, indent=4) @@ -59,7 +59,7 @@ def configure(group): with RemoteExecutor(group.cluster) as exe: for node in group.get_ordered_members_list(provide_node_configs=True): - os_specific_associations = group.cluster.get_associations_for_node(node['connect_to'])['docker'] + os_specific_associations = group.cluster.get_associations_for_node(node['connect_to'], 'docker') log.debug("Uploading docker configuration to %s node..." % node['name']) node['connection'].put(StringIO(settings_json), os_specific_associations['config_location'], backup=True, sudo=True) diff --git a/kubemarine/demo.py b/kubemarine/demo.py index 3af4b0e50..972df17f2 100644 --- a/kubemarine/demo.py +++ b/kubemarine/demo.py @@ -362,12 +362,14 @@ def _create_connection_from_details(self, ip: str, conn_details: dict, gateway=N ) -def create_silent_context(parser: argparse.ArgumentParser, args: list, procedure: str = None): - args = list(args) +def create_silent_context(args: list = None, parser: argparse.ArgumentParser = None, procedure: str = None): + args = list(args) if args else [] # todo probably increase logging level to get rid of spam in logs. if '--disable-dump' not in args: args.append('--disable-dump') + if parser is None: + parser = flow.new_common_parser("Help text") context = flow.create_context(parser, args, procedure=procedure) del context['execution_arguments']['ansible_inventory_location'] context['preserve_inventory'] = False @@ -375,11 +377,26 @@ def create_silent_context(parser: argparse.ArgumentParser, args: list, procedure return context -def new_cluster(inventory, procedure=None, fake=True, context: dict = None, - os_name='centos', os_version='7.9', net_interface='eth0'): +def new_cluster(inventory, procedure_inventory=None, context: dict = None, + fake=True) -> Union[KubernetesCluster, FakeKubernetesCluster]: if context is None: - context = create_silent_context(flow.new_common_parser("Help text"), [], procedure=procedure) + context = create_silent_context() + nodes_context = generate_nodes_context(inventory) + nodes_context.update(context['nodes']) + context['nodes'] = nodes_context + + # It is possible to disable FakeCluster and create real cluster Object for some business case + if fake: + cluster = FakeKubernetesCluster(inventory, context, procedure_inventory=procedure_inventory) + else: + cluster = KubernetesCluster(inventory, context, procedure_inventory=procedure_inventory) + + cluster.enrich() + return cluster + + +def generate_nodes_context(inventory: dict, os_name='centos', os_version='7.9', net_interface='eth0') -> dict: os_family = None if os_name in ['centos', 'rhel']: @@ -387,6 +404,7 @@ def new_cluster(inventory, procedure=None, fake=True, context: dict = None, elif os_name in ['ubuntu', 'debian']: os_family = 'debian' + context = {} for node in inventory['nodes']: node_context = { 'name': node['name'], @@ -405,18 +423,9 @@ def new_cluster(inventory, procedure=None, fake=True, context: dict = None, connect_to = node['internal_address'] if node.get('address'): connect_to = node['address'] - context['nodes'][connect_to] = node_context + context[connect_to] = node_context - context['os'] = os_family - - # It is possible to disable FakeCluster and create real cluster Object for some business case - if fake: - cluster = FakeKubernetesCluster(inventory, context) - else: - cluster = KubernetesCluster(inventory, context) - - cluster.enrich() - return cluster + return context def generate_inventory(balancer=1, master=1, worker=1, keepalived=0, haproxy_mntc=0): diff --git a/kubemarine/haproxy.py b/kubemarine/haproxy.py index 2d9da320f..9574dcf56 100644 --- a/kubemarine/haproxy.py +++ b/kubemarine/haproxy.py @@ -37,9 +37,9 @@ def is_maintenance_mode(cluster: KubernetesCluster) -> bool: .get('haproxy', {}).get('maintenance_mode', False)) -def get_associations_for_node(node: dict) -> dict: +def _get_associations_for_node(node: dict) -> dict: conn: NodeGroup = node['connection'] - return conn.cluster.get_associations_for_node(node['connect_to'])['haproxy'] + return conn.cluster.get_associations_for_node(node['connect_to'], 'haproxy') def _is_vrrp_not_bind(vrrp_item: dict): @@ -96,8 +96,10 @@ def enrich_inventory(inventory, cluster): raise Exception("Haproxy maintenance mode should be used when and only when " "there is at least one VRRP IP with 'maintenance-type: not bind'") - if is_mntc_mode: - config_location = get_associations_for_node(node)['config_location'] + group: NodeGroup = node["connection"] + os_family = group.get_nodes_os() + if is_mntc_mode and os_family not in ('unknown', 'unsupported'): + config_location = _get_associations_for_node(node)['config_location'] mntc_config_location = inventory['services']['loadbalancer']['haproxy']['mntc_config_location'] # if 'maintenance_mode' is True then maintenance config and default config must be stored in different files' if mntc_config_location == config_location: @@ -109,7 +111,7 @@ def enrich_inventory(inventory, cluster): def get_config_path(group: NodeGroup) -> NodeGroupResult: with RemoteExecutor(group.cluster) as exe: for node in group.get_ordered_members_list(provide_node_configs=True): - package_associations = get_associations_for_node(node) + package_associations = _get_associations_for_node(node) cmd = f"systemctl show -p MainPID {package_associations['service_name']} " \ f"| cut -d '=' -f2 " \ f"| xargs -I PID sudo cat /proc/PID/environ " \ @@ -122,7 +124,7 @@ def get_config_path(group: NodeGroup) -> NodeGroupResult: def install(group): with RemoteExecutor(group.cluster) as exe: for node in group.get_ordered_members_list(provide_node_configs=True): - package_associations = get_associations_for_node(node) + package_associations = _get_associations_for_node(node) node['connection'].sudo("%s -v" % package_associations['executable_name'], warn=True) haproxy_installed = True @@ -136,7 +138,7 @@ def install(group): else: with RemoteExecutor(group.cluster) as exe: for node in group.get_ordered_members_list(provide_node_configs=True): - package_associations = get_associations_for_node(node) + package_associations = _get_associations_for_node(node) packages.install(node["connection"], include=package_associations['package_name']) installation_result = exe.get_last_results_str() @@ -154,7 +156,7 @@ def uninstall(group): def restart(group): for node in group.get_ordered_members_list(provide_node_configs=True): - service_name = get_associations_for_node(node)['service_name'] + service_name = _get_associations_for_node(node)['service_name'] system.restart_service(node['connection'], name=service_name) RemoteExecutor(group.cluster).flush() group.cluster.log.debug("Sleep while haproxy comes-up...") @@ -165,14 +167,14 @@ def restart(group): def disable(group): with RemoteExecutor(group.cluster): for node in group.get_ordered_members_list(provide_node_configs=True): - service_name = get_associations_for_node(node)['service_name'] + service_name = _get_associations_for_node(node)['service_name'] system.disable_service(node['connection'], name=service_name) def enable(group): with RemoteExecutor(group.cluster): for node in group.get_ordered_members_list(provide_node_configs=True): - service_name = get_associations_for_node(node)['service_name'] + service_name = _get_associations_for_node(node)['service_name'] system.enable_service(node['connection'], name=service_name, now=True) @@ -206,7 +208,7 @@ def configure(group: NodeGroup): all_nodes_configs = cluster.nodes['all'].get_final_nodes().get_ordered_members_list(provide_node_configs=True) for node in group.get_ordered_members_list(provide_node_configs=True): - package_associations = get_associations_for_node(node) + package_associations = _get_associations_for_node(node) configs_directory = '/'.join(package_associations['config_location'].split('/')[:-1]) cluster.log.debug("\nConfiguring haproxy on \'%s\'..." % node['name']) @@ -227,12 +229,15 @@ def configure(group: NodeGroup): node['connection'].sudo('ls -la %s' % mntc_config_location) -def override_haproxy18(group): +def override_haproxy18(group: NodeGroup): rhel_nodes = group.get_subgroup_with_os('rhel') if rhel_nodes.is_empty(): group.cluster.log.debug('Haproxy18 override is not required') return - package_associations = group.cluster.get_associations_for_os('rhel')['haproxy'] + + # Any node in group has rhel OS family, so association can be fetched from any node. + any_host = rhel_nodes.get_first_member().get_host() + package_associations = group.cluster.get_associations_for_node(any_host, 'haproxy') # TODO: Do not replace the whole file, replace only parameter return group.put(io.StringIO("CONFIG=%s\n" % package_associations['config_location']), '/etc/sysconfig/%s' % package_associations['service_name'], backup=True, sudo=True) diff --git a/kubemarine/keepalived.py b/kubemarine/keepalived.py index 2cb56d95c..63912bc05 100644 --- a/kubemarine/keepalived.py +++ b/kubemarine/keepalived.py @@ -147,10 +147,15 @@ def enrich_inventory_calculate_nodegroup(inventory, cluster): return inventory -def install(group): - log = group.cluster.log +def install(group: NodeGroup): + cluster = group.cluster + log = cluster.log - package_associations = group.cluster.inventory['services']['packages']['associations']['keepalived'] + # todo why check and try to install all keepalives but finally filter out only new nodes? + group = group.get_new_nodes_or_self() + # todo consider probably different associations for nodes with different OS families + any_host = group.get_first_member().get_host() + package_associations = cluster.get_associations_for_node(any_host, 'keepalived') keepalived_version = group.sudo("%s -v" % package_associations['executable_name'], warn=True) keepalived_installed = True @@ -163,9 +168,9 @@ def install(group): log.debug("Keepalived already installed, nothing to install") installation_result = keepalived_version else: - installation_result = packages.install(group.get_new_nodes_or_self(), include=package_associations['package_name']) + installation_result = packages.install(group, include=package_associations['package_name']) - service_name = group.cluster.inventory['services']['packages']['associations']['keepalived']['service_name'] + service_name = package_associations['service_name'] patch_path = utils.get_resource_absolute_path("./resources/drop_ins/keepalived.conf", script_relative=True) group.call(system.patch_systemd_service, service_name=service_name, patch_source=patch_path) group.call(install_haproxy_check_script) @@ -184,30 +189,31 @@ def uninstall(group): return packages.remove(group, include='keepalived') -def restart(group): +def restart(group: NodeGroup): results = NodeGroupResult(group.cluster) for node in group.get_ordered_members_list(provide_node_configs=True): - os_specific_associations = group.cluster.get_associations_for_node(node['connect_to']) - package_associations = os_specific_associations['keepalived'] - results.update(system.restart_service(node['connection'], name=package_associations['service_name'])) + service_name = group.cluster.get_package_association_for_node( + node['connect_to'], 'keepalived', 'service_name') + results.update(system.restart_service(node['connection'], name=service_name)) group.cluster.log.debug("Sleep while keepalived comes-up...") time.sleep(group.cluster.globals['keepalived']['restart_wait']) return results -def enable(group): +def enable(group: NodeGroup): with RemoteExecutor(group.cluster): for node in group.get_ordered_members_list(provide_node_configs=True): - os_specific_associations = group.cluster.get_associations_for_node(node['connect_to']) - system.enable_service(node['connection'], name=os_specific_associations['keepalived']['service_name'], - now=True) + service_name = group.cluster.get_package_association_for_node( + node['connect_to'], 'keepalived', 'service_name') + system.enable_service(node['connection'], name=service_name, now=True) -def disable(group): +def disable(group: NodeGroup): with RemoteExecutor(group.cluster): for node in group.get_ordered_members_list(provide_node_configs=True): - os_specific_associations = group.cluster.get_associations_for_node(node['connect_to']) - system.disable_service(node['connection'], name=os_specific_associations['keepalived']['service_name']) + service_name = group.cluster.get_package_association_for_node( + node['connect_to'], 'keepalived', 'service_name') + system.disable_service(node['connection'], name=service_name) def generate_config(inventory, node): @@ -253,7 +259,7 @@ def configure(group: NodeGroup) -> NodeGroupResult: log.debug("Configuring keepalived on '%s'..." % node['name']) - package_associations = group.cluster.get_associations_for_node(node['connect_to'])['keepalived'] + package_associations = group.cluster.get_associations_for_node(node['connect_to'], 'keepalived') configs_directory = '/'.join(package_associations['config_location'].split('/')[:-1]) group.sudo('mkdir -p %s' % configs_directory, hide=True) diff --git a/kubemarine/kubernetes/__init__.py b/kubemarine/kubernetes/__init__.py index 9f36c22c6..046689fca 100644 --- a/kubemarine/kubernetes/__init__.py +++ b/kubemarine/kubernetes/__init__.py @@ -16,7 +16,7 @@ import math import time from copy import deepcopy -from typing import List +from typing import List, Dict, Union import ruamel.yaml import yaml @@ -30,6 +30,11 @@ version_coredns_path_breakage = "v1.21.2" +ERROR_DOWNGRADE='Kubernetes old version \"%s\" is greater than new one \"%s\"' +ERROR_SAME='Kubernetes old version \"%s\" is the same as new one \"%s\"' +ERROR_MAJOR_RANGE_EXCEEDED='Major version \"%s\" rises to new \"%s\" more than one' +ERROR_MINOR_RANGE_EXCEEDED='Minor version \"%s\" rises to new \"%s\" more than one' + def add_node_enrichment(inventory, cluster): if cluster.context.get('initial_procedure') != 'add_node': @@ -848,11 +853,13 @@ def prepare_drain_command(node, version: str, globals, disable_eviction: bool, n def upgrade_cri_if_required(group): - log = group.cluster.log - cri_impl = group.cluster.inventory['services']['cri']['containerRuntime'] + # currently it is invoked only for single node + cluster = group.cluster + log = cluster.log + cri_impl = cluster.inventory['services']['cri']['containerRuntime'] - if cri_impl in group.cluster.context["packages"]["upgrade_required"]: - cri_packages = group.cluster.inventory['services']['packages']['associations'][cri_impl]['package_name'] + if cri_impl in cluster.context["packages"]["upgrade_required"]: + cri_packages = cluster.get_package_association_for_node(group.get_host(), cri_impl, 'package_name') log.debug(f"Installing {cri_packages}") packages.install(group, include=cri_packages) @@ -935,7 +942,8 @@ def expect_kubernetes_version(cluster, version, timeout=None, retries=None, node raise Exception('In the expected time, the nodes did not receive correct Kubernetes version') -def test_version(version): +def test_version(version: Union[list, str]): + version_list: list = version # catch version without "v" at the first symbol if isinstance(version, str): if not version.startswith('v'): @@ -959,34 +967,30 @@ def test_version(version): def test_version_upgrade_possible(old, new, skip_equal=False): - versions = { + versions_unchanged = { 'old': old.strip(), 'new': new.strip() } - versions_unchanged = versions.copy() + versions: Dict[str, List[int]] = {} - for v_type, version in versions.items(): + for v_type, version in versions_unchanged.items(): versions[v_type] = test_version(version) # test new is greater than old if tuple(versions['old']) > tuple(versions['new']): - raise Exception('Kubernetes old version \"%s\" is greater than new one \"%s\"' - % (versions_unchanged['old'], versions_unchanged['new'])) + raise Exception(ERROR_DOWNGRADE % (versions_unchanged['old'], versions_unchanged['new'])) # test new is the same as old if tuple(versions['old']) == tuple(versions['new']) and not skip_equal: - raise Exception('Kubernetes old version \"%s\" is the same as new one \"%s\"' - % (versions_unchanged['old'], versions_unchanged['new'])) + raise Exception(ERROR_SAME % (versions_unchanged['old'], versions_unchanged['new'])) # test major step is not greater than 1 if versions['new'][0] - versions['old'][0] > 1: - raise Exception('Major version \"%s\" rises to new \"%s\" more than one' - % (versions_unchanged['old'], versions_unchanged['new'])) + raise Exception(ERROR_MAJOR_RANGE_EXCEEDED % (versions_unchanged['old'], versions_unchanged['new'])) # test minor step is not greater than 1 if versions['new'][1] - versions['old'][1] > 1: - raise Exception('Minor version \"%s\" rises to new \"%s\" more than one' - % (versions_unchanged['old'], versions_unchanged['new'])) + raise Exception(ERROR_MINOR_RANGE_EXCEEDED % (versions_unchanged['old'], versions_unchanged['new'])) def recalculate_proper_timeout(nodes, timeout): diff --git a/kubemarine/packages.py b/kubemarine/packages.py index bc4d0eec1..91788a956 100644 --- a/kubemarine/packages.py +++ b/kubemarine/packages.py @@ -12,64 +12,196 @@ # See the License for the specific language governing permissions and # limitations under the License. -from copy import deepcopy from typing import List, Dict -from kubemarine import yum, system, apt +from kubemarine import yum, apt +from kubemarine.core.cluster import KubernetesCluster from kubemarine.core.executor import RemoteExecutor from kubemarine.core.group import NodeGroup, NodeGroupResult +from kubemarine.core.yaml_merger import default_merger -def enrich_inventory_associations(inventory, cluster): - os_family = system.get_os_family(cluster) +ERROR_GLOBAL_ASSOCIATIONS_REDEFINED_MULTIPLE_OS = \ + "It is not supported to customize services.packages.associations section " \ + "if nodes have different OS families. " \ + "Please move the section to corresponding services.packages.associations. section." - associations = inventory['services']['packages']['associations'] - if not associations.get(os_family): - # already enriched +ERROR_MULTIPLE_PACKAGE_VERSIONS_DETECTED = \ + "Multiple package versions detected %s for package '%s'. " \ + "Align them to the single version manually or using corresponding task of install procedure. " \ + "Alternatively, specify cache_versions=false for corresponding association." + + +def enrich_inventory_associations(inventory, cluster: KubernetesCluster): + associations: dict = inventory['services']['packages']['associations'] + os_propagated_associations = {} + + # move associations for OS families as-is + for association_name in get_associations_os_family_keys(): + os_propagated_associations[association_name] = associations.pop(association_name) + + inventory['services']['packages']['associations'] = os_propagated_associations + + # Check remained associations section if they are customized at global level. + if associations: + os_family = cluster.get_os_family() + if os_family == 'multiple': + raise Exception(ERROR_GLOBAL_ASSOCIATIONS_REDEFINED_MULTIPLE_OS) + elif os_family not in ('unknown', 'unsupported'): + # move remained associations properties to the specific OS family section and merge with priority + default_merger.merge(os_propagated_associations[os_family], associations) + + return inventory + + +def cache_package_versions(cluster: KubernetesCluster, inventory: dict, ensured_associations_only=False) -> dict: + os_ids = cluster.get_os_identifiers() + different_os = list(set(os_ids.values())) + if len(different_os) > 1: + cluster.log.debug(f"Final nodes have different OS families or versions, packages will not be cached. " + f"List of (OS family, version): {different_os}") return inventory - os_specific_associations = deepcopy(associations[os_family]) - # Cache packages versions only if the option is set in configuration, so we cut the version from 'package_name' - if not cluster.inventory['services']['packages']['cache_versions']: - for association in os_specific_associations: - if type(os_specific_associations[association]['package_name']) is list: - for item, package in enumerate(os_specific_associations[association]['package_name']): - if os_family in ["rhel", "rhel8"]: - os_specific_associations[association]['package_name'][item] = \ - os_specific_associations[association]['package_name'][item].split('-{{')[0] - else: - os_specific_associations[association]['package_name'][item] = \ - os_specific_associations[association]['package_name'][item].split('={{')[0] - elif type(os_specific_associations[association]['package_name']) is str: - if os_family in ["rhel", "rhel8"]: - os_specific_associations[association]['package_name'] = \ - os_specific_associations[association]['package_name'].split('-{{')[0] - else: - os_specific_associations[association]['package_name'] = \ - os_specific_associations[association]['package_name'].split('={{')[0] - else: - raise Exception('Unexpected value for association') + os_family = different_os[0][0] + if os_family in ('unknown', 'unsupported'): + # For add_node/install procedures we check that OS is supported in prepare.check.system task. + # For check_iaas procedure it is allowed to have unsupported OS, so skip caching. + cluster.log.debug("Skip caching of packages for unsupported OS.") + return inventory + + nodes_cache_versions = cluster.nodes['all'].get_final_nodes().get_sudo_nodes() + if nodes_cache_versions.is_empty(): + # For add_node/install procedures we check that all nodes are sudoers in prepare.check.sudoer task. + # For check_iaas procedure the nodes might still be not sudoers, so skip caching. + cluster.log.debug(f"There are no nodes with sudo privileges, packages will not be cached.") + return inventory + + packages_list = _get_packages_to_detect_versions(cluster, inventory, ensured_associations_only) + detected_packages = detect_installed_packages_version_groups(nodes_cache_versions, packages_list) + + _cache_package_associations(cluster, inventory, detected_packages, ensured_associations_only) + _cache_custom_packages(cluster, inventory, detected_packages, ensured_associations_only) + + cluster.log.debug('Package versions detection finished') + return inventory + + +def _get_associations(cluster: KubernetesCluster, inventory: dict): + return inventory['services']['packages']['associations'][cluster.get_os_family()] - else: - # set 'skip_caching' for customer association - if cluster.raw_inventory.get('services', {}).get('packages', {}).get('associations', {}): - for package in cluster.raw_inventory['services']['packages']['associations']: - os_specific_associations[package]['skip_caching'] = "true" - os_specific_associations['debian'] = deepcopy(associations['debian']) - os_specific_associations['rhel'] = deepcopy(associations['rhel']) - os_specific_associations['rhel8'] = deepcopy(associations['rhel8']) +def _get_package_names_for_association(cluster: KubernetesCluster, inventory: dict, association_name: str) -> list: + if association_name in get_associations_os_family_keys(): + return [] - for association_name, properties in associations.items(): - if association_name in os_specific_associations.keys(): - for key, value in properties.items(): - os_specific_associations[association_name][key] = value + associated_packages = _get_associations(cluster, inventory)[association_name].get('package_name') + if isinstance(associated_packages, str): + associated_packages = [associated_packages] + elif not isinstance(associated_packages, list): + raise Exception('Unsupported associated packages object type') - inventory['services']['packages']['associations'] = os_specific_associations + return associated_packages + + +def _get_packages_for_associations_to_detect(cluster: KubernetesCluster, inventory: dict, association_name: str, + ensured_association_only: bool) -> list: + packages_list = _get_package_names_for_association(cluster, inventory, association_name) + if not packages_list: + return [] + + global_cache_versions = inventory['services']['packages']['cache_versions'] + associated_params = _get_associations(cluster, inventory)[association_name] + if not ensured_association_only or (global_cache_versions and associated_params.get('cache_versions', True)): + return packages_list + + return [] + + +def _get_packages_to_detect_versions(cluster: KubernetesCluster, inventory: dict, ensured_association_only: bool) -> list: + packages_list = [] + for association_name in _get_associations(cluster, inventory).keys(): + packages_list.extend(_get_packages_for_associations_to_detect( + cluster, inventory, association_name, ensured_association_only)) + + if not ensured_association_only and inventory['services']['packages'].get('install', {}): + packages_list.extend(inventory['services']['packages']['install']['include']) + + return packages_list + + +def _cache_package_associations(cluster: KubernetesCluster, inventory: dict, + detected_packages: Dict[str, Dict[str, List]], ensured_association_only: bool): + for association_name, associated_params in _get_associations(cluster, inventory).items(): + packages_list = _get_packages_for_associations_to_detect( + cluster, inventory, association_name, ensured_association_only) + if not packages_list: + continue + + final_packages_list = [] + for package in packages_list: + final_package = _detect_final_package(cluster, detected_packages, package, ensured_association_only) + final_packages_list.append(final_package) + + # if non-multiple value, then convert to simple string + # packages can contain multiple package values, like docker package + # (it has docker-ce, docker-cli and containerd.io packages for installation) + if len(final_packages_list) == 1: + final_packages_list = final_packages_list[0] + + associated_params['package_name'] = final_packages_list + + +def _cache_custom_packages(cluster: KubernetesCluster, inventory: dict, + detected_packages: Dict[str, Dict[str, List]], ensured_association_only: bool): + if ensured_association_only: + return + # packages from direct installation section + custom_install_packages = inventory['services']['packages'].get('install', {}) + if custom_install_packages: + final_packages_list = [] + for package in custom_install_packages['include']: + final_package = _detect_final_package(cluster, detected_packages, package, False) + final_packages_list.append(final_package) + custom_install_packages['include'] = final_packages_list + return detected_packages + + +def _detect_final_package(cluster: KubernetesCluster, detected_packages: Dict[str, Dict[str, List]], + package: str, ensured_association_only: bool) -> str: + # add package version to list only if it was found as installed + detected_package_versions = list(filter(lambda version: "not installed" not in version, + detected_packages[package].keys())) + + # if there no versions detected, then return default package from inventory + if not detected_package_versions: + return package + elif len(detected_package_versions) > 1: + if ensured_association_only: + raise Exception(ERROR_MULTIPLE_PACKAGE_VERSIONS_DETECTED % (str(detected_packages[package]), package)) + else: + cluster.log.warning( + f"Multiple package versions detected {detected_packages[package]} for package '{package}'. " + f"Use default package '{package}' from inventory.") + # return default package from inventory if multiple versions detected + return package + else: + return detected_package_versions[0] + + +def remove_unused_os_family_associations(cluster: KubernetesCluster, inventory: dict): + final_nodes = cluster.nodes['all'].get_final_nodes() + for os_family in get_associations_os_family_keys(): + # Do not remove OS family associations section in finalized inventory if any node has this OS family. + if final_nodes.get_subgroup_with_os(os_family).is_empty(): + del inventory['services']['packages']['associations'][os_family] return inventory +def get_associations_os_family_keys(): + return {'debian', 'rhel', 'rhel8'} + + def get_package_manager(group: NodeGroup) -> apt or yum: os_family = group.get_nodes_os() @@ -109,12 +241,23 @@ def upgrade(group: NodeGroup, include=None, exclude=None, **kwargs) -> NodeGroup return get_package_manager(group).upgrade(group, include, exclude, **kwargs) -def detect_installed_package_version(group: NodeGroup, package: str, warn=True) -> NodeGroupResult: +def get_detect_package_version_cmd(os_family: str, package_name: str) -> str: + if os_family in ["rhel", "rhel8"]: + cmd = r"rpm -q %s" % package_name + else: + cmd = r"dpkg-query -f '${Package}=${Version}\n' -W %s" % package_name + + # This is WA for RemoteExecutor, since any package failed others are not checked + # TODO: get rid of this WA and use warn=True in sudo + cmd += ' || true' + return cmd + + +def detect_installed_package_version(group: NodeGroup, package: str) -> NodeGroupResult: """ Detect package versions for each host on remote group :param group: Group of nodes, where package should be found - :param package: package name, which version should be detected (eg. 'podman' and 'containerd' without any version suggestion) - :param warn: Suppress exception for non-found packages + :param package: package name, which version should be detected (eg. 'podman' and 'containerd') :return: NodeGroupResults with package version on each host Method generates different package query for different OS. @@ -126,110 +269,54 @@ def detect_installed_package_version(group: NodeGroup, package: str, warn=True) os_family = group.get_nodes_os() package_name = get_package_name(os_family, package) - if os_family in ["rhel", "rhel8"]: - cmd = r"rpm -q %s" % package_name - else: - cmd = r"dpkg-query -f '${Package}=${Version}\n' -W %s" % package_name - - # This is WA for RemoteExecutor, since any package failed others are not checked - # TODO: get rid of this WA and use warn=True in sudo - if warn: - cmd += ' || true' - + cmd = get_detect_package_version_cmd(os_family, package_name) return group.sudo(cmd) -def detect_installed_packages_versions(group: NodeGroup, packages_list: List or str = None) -> Dict[str, NodeGroupResult]: +def detect_installed_packages_version_groups(group: NodeGroup, packages_list: List or str) -> Dict[str, Dict[str, List]]: """ - Detect packages versions for each host on remote group from specified list of packages + Detect grouped packages versions on remote group from specified list of packages. :param group: Group of nodes, where packages should be found - :param packages_list: Single package or list of packages, which versions should be detected. If packages list empty, - then packages will be automatically added from services.packages.associations and services.packages.install.include - :return: Dictionary with NodeGroupResults for each queried package, e.g. "foo" -> {1.1.1.1:"foo-1", 1.1.1.2:"foo-2"} + :param packages_list: Single package or list of packages, which versions should be detected. + :return: Dictionary with grouped versions for each queried package, pointing to list of hosts, + e.g. {"foo" -> {"foo-1": [host1, host2]}, "bar" -> {"bar-1": [host1], "bar-2": [host2]}} """ cluster = group.cluster - excluded_dict = {} - if not packages_list: - packages_list = [] - # packages from associations - for association_name, associated_params in cluster.inventory['services']['packages']['associations'].items(): - associated_packages = associated_params.get('package_name', []) - if isinstance(associated_packages, str): - packages_list.append(get_package_name(group.get_nodes_os(), associated_packages)) - else: - associated_packages_clean = [] - for package in associated_packages: - associated_packages_clean.append(get_package_name(group.get_nodes_os(), package)) - packages_list = packages_list + associated_packages_clean - if associated_params.get('skip_caching', False): - # replace packages with associated version that shoud be excluded from cache - for excluded_package in associated_params['package_name']: - excluded_dict[get_package_name(group.get_nodes_os(), excluded_package)] = excluded_package - - # dedup + if isinstance(packages_list, str): + packages_list = [packages_list] + # deduplicate packages_list = list(set(packages_list)) + if not packages_list: + return {} with RemoteExecutor(cluster) as exe: for package in packages_list: - package_name = get_package_name(group.get_nodes_os(), package) - detect_installed_package_version(group, package_name, warn=True) + detect_installed_package_version(group, package) raw_result = exe.get_last_results() - results: dict[str, NodeGroupResult] = {} + results: Dict[str, Dict[str, List]] = {} for i, package in enumerate(packages_list): - results[package] = NodeGroupResult(cluster) - for host, multiple_results in raw_result.items(): + detected_grouped_packages = {} + for conn, multiple_results in raw_result.items(): node_detected_package = multiple_results[i].stdout.strip() + multiple_results[i].stderr.strip() - if "not installed" in node_detected_package or "no packages found" in node_detected_package: + # consider version, which ended with special symbol = or - as not installed + # (it is possible in some cases to receive "containerd=" version) + if "not installed" in node_detected_package or "no packages found" in node_detected_package \ + or node_detected_package[-1] == '=' or node_detected_package[-1] == '-': node_detected_package = f"not installed {package}" - else: - if package in excluded_dict.keys(): - node_detected_package = excluded_dict[package] - results[package][host] = node_detected_package - - return results - - -def detect_installed_packages_version_groups(group: NodeGroup, packages_list: List or str = None) \ - -> Dict[str, Dict[str, List]]: - """ - Detect grouped packages versions on remote group from specified list of packages. - :param group: Group of nodes, where packages should be found - :param packages_list: Single package or list of packages, which versions should be detected. If packages list empty, - then packages will be automatically added from services.packages.associations and services.packages.install.include - :return: Dictionary with grouped versions for each queried package, pointing to list of hosts, - e.g. {"foo" -> {"foo-1": [host1, host2]}, "bar" -> {"bar-1": [host1], "bar-2": [host2]}} - """ + detected_grouped_packages.setdefault(node_detected_package, []).append(conn.host) - detected_packages = detect_installed_packages_versions(group, packages_list) - grouped_packages: Dict[str, Dict[str, List]] = {} - for queried_package, detected_packages_results in detected_packages.items(): - detected_grouped_packages = {} - for host, packages in detected_packages_results.items(): - if '\n' in packages: - # this is the test, when package name contains multiple names, - # e.g. docker-ce and docker-cli for "docker-ce-*" query - packages = packages.split('\n') - else: - packages = [packages] - - for pckg in packages: - if pckg not in detected_grouped_packages: - detected_grouped_packages[pckg] = [host] - else: - detected_grouped_packages[pckg].append(host) + results[package] = detected_grouped_packages - grouped_packages[queried_package] = detected_grouped_packages - - return grouped_packages + return results def get_package_name(os_family: str, package: str) -> str: """ - Return the pure package name, whithout any part of version + Return the pure package name, without any part of version """ import re diff --git a/kubemarine/procedures/add_node.py b/kubemarine/procedures/add_node.py index 06dcf89e8..a257a77f6 100755 --- a/kubemarine/procedures/add_node.py +++ b/kubemarine/procedures/add_node.py @@ -17,9 +17,10 @@ import copy from collections import OrderedDict -from kubemarine import kubernetes, system +from kubemarine import kubernetes, packages from kubemarine.core import flow, utils from kubemarine.core.action import Action +from kubemarine.core.cluster import KubernetesCluster from kubemarine.core.resources import DynamicResources from kubemarine.procedures import install @@ -92,27 +93,13 @@ def add_node_finalize_inventory(cluster, inventory_to_finalize): return inventory_to_finalize -def cache_installed_packages(cluster): +def cache_installed_packages(cluster: KubernetesCluster): """ Task which is used to collect already installed packages versions on already existing nodes. It is called first during "add_node" procedure, so that new nodes install exactly the same packages as on other already existing nodes. """ - unique_os = set(((cluster.context['nodes'][node['connect_to']]['os']['family']), (cluster.context['nodes'][node['connect_to']]['os']['version'])) - for node in cluster.nodes['all'].get_ordered_members_list(provide_node_configs=True)) - different_os = len(unique_os) - - if different_os > 1: - cluster.log.debug(f"New node has different OS" - f" than some other nodes, " - "packages will not be cached.") - cluster.log.debug("Count different OS %d ", different_os) - cluster.log.debug("Unique OS %s ", unique_os) - - return - # Cache packages only if it's set in configuration - if cluster.inventory['services']['packages']['cache_versions']: - cluster.cache_package_versions() + packages.cache_package_versions(cluster, cluster.inventory, ensured_associations_only=True) tasks = OrderedDict(copy.deepcopy(install.tasks)) diff --git a/kubemarine/procedures/backup.py b/kubemarine/procedures/backup.py index 99dd782d7..05447be93 100755 --- a/kubemarine/procedures/backup.py +++ b/kubemarine/procedures/backup.py @@ -25,18 +25,16 @@ from collections import OrderedDict import yaml -from kubemarine import system from kubemarine.core import utils, flow from kubemarine.core.action import Action from kubemarine.core.cluster import KubernetesCluster from kubemarine.core.group import NodeGroup from kubemarine.core.resources import DynamicResources -from kubemarine.procedures import install -def get_default_backup_files_list(cluster): - haproxy_service = cluster.inventory['services']['packages']['associations']['haproxy']['service_name'] - keepalived_service = cluster.inventory['services']['packages']['associations']['keepalived']['service_name'] +def get_default_backup_files_list(cluster: KubernetesCluster): + haproxy_service = cluster.get_package_association('haproxy', 'service_name') + keepalived_service = cluster.get_package_association('keepalived', 'service_name') backup_files_list = [ "/etc/resolv.conf", @@ -91,9 +89,9 @@ def export_ansible_inventory(cluster): cluster.log.verbose('ansible-inventory.ini exported to backup') -def export_packages_list(cluster): +def export_packages_list(cluster: KubernetesCluster): cluster.context['backup_descriptor']['nodes']['packages'] = {} - if system.get_os_family(cluster) in ['rhel', 'rhel8']: + if cluster.get_os_family() in ['rhel', 'rhel8']: cmd = r"rpm -qa" else: cmd = r"dpkg-query -f '${Package}=${Version}\n' -W" diff --git a/kubemarine/procedures/check_paas.py b/kubemarine/procedures/check_paas.py index ae72f863d..9b05325a1 100755 --- a/kubemarine/procedures/check_paas.py +++ b/kubemarine/procedures/check_paas.py @@ -36,13 +36,13 @@ from deepdiff import DeepDiff -def services_status(cluster, service_type): +def services_status(cluster: KubernetesCluster, service_type: str): with TestCase(cluster.context['testsuite'], '201', "Services", "%s Status" % service_type.capitalize(), default_results='active (running)'): service_name = service_type - if cluster.inventory['services']['packages']['associations'].get(service_type): - service_name = cluster.inventory['services']['packages']['associations'][service_type]['service_name'] + if cluster.get_os_family() != 'multiple' and service_type != 'kubelet': + service_name = cluster.get_package_association(service_type, 'service_name') group = cluster.nodes['all'] if service_type == 'haproxy': @@ -92,7 +92,17 @@ def services_status(cluster, service_type): hint="Fix the service to be enabled and has running status.") -def recommended_system_packages_versions(cluster): +def _check_same_os(cluster: KubernetesCluster): + os_ids = cluster.get_os_identifiers() + different_os = set(os_ids.values()) + if len(different_os) > 1: + cluster.log.warning( + f"Nodes have different OS families or versions, packages versions cannot be checked. " + f"List of (OS family, version): {list(different_os)}") + raise TestFailure(f"Nodes have different OS families or versions") + + +def recommended_system_packages_versions(cluster: KubernetesCluster): """ Task that checks if configured "system" packages versions are compatible with the configured k8s version and OS. Fails if unable to detect the OS family. @@ -138,7 +148,7 @@ def recommended_system_packages_versions(cluster): good_results = set() bad_results = [] for package_alias, expected_packages in expected_system_packages.items(): - actual_packages = cluster.inventory["services"]["packages"]["associations"][package_alias]["package_name"] + actual_packages = cluster.get_package_association(package_alias, "package_name") if not isinstance(actual_packages, list): actual_packages = [actual_packages] for expected_pckg, version in expected_packages.items(): @@ -164,7 +174,7 @@ def recommended_system_packages_versions(cluster): tc.success("all packages have recommended versions") -def system_packages_versions(cluster, pckg_alias): +def system_packages_versions(cluster: KubernetesCluster, pckg_alias: str): """ Verifies that system packages are installed on required nodes and have equal versions. Failure is shown if check is not successful. @@ -172,6 +182,7 @@ def system_packages_versions(cluster, pckg_alias): :param pckg_alias: system package alias to retrieve "package_name" association. """ with TestCase(cluster.context['testsuite'], '205', "Services", f"{pckg_alias} version") as tc: + _check_same_os(cluster) if pckg_alias == "docker" or pckg_alias == "containerd": group = cluster.nodes['control-plane'].include_group(cluster.nodes.get('worker')) elif pckg_alias == "keepalived" or pckg_alias == "haproxy": @@ -182,18 +193,19 @@ def system_packages_versions(cluster, pckg_alias): else: raise Exception(f"Unknown system package alias: {pckg_alias}") - packages = cluster.inventory['services']['packages']['associations'][pckg_alias]['package_name'] + packages = cluster.get_package_association(pckg_alias, 'package_name') if not isinstance(packages, list): packages = [packages] return check_packages_versions(cluster, tc, group, packages) -def generic_packages_versions(cluster): +def generic_packages_versions(cluster: KubernetesCluster): """ Verifies that user-provided packages are installed on required nodes and have equal versions. Warning is shown if check is not successful. """ with TestCase(cluster.context['testsuite'], '206', "Services", f"Generic packages version") as tc: + _check_same_os(cluster) packages = cluster.inventory['services']['packages']['install']['include'] return check_packages_versions(cluster, tc, cluster.nodes['all'], packages, warn_on_bad_result=True) @@ -571,7 +583,7 @@ def verify_selinux_status(cluster: KubernetesCluster) -> None: :param cluster: KubernetesCluster object :return: None """ - if system.get_os_family(cluster) == 'debian': + if cluster.get_os_family() == 'debian': return with TestCase(cluster.context['testsuite'], '213', "Security", "Selinux security policy") as tc: @@ -630,7 +642,7 @@ def verify_selinux_config(cluster: KubernetesCluster) -> None: :param cluster: KubernetesCluster object :return: None """ - if system.get_os_family(cluster) == 'debian': + if cluster.get_os_family() == 'debian': return with TestCase(cluster.context['testsuite'], '214', "Security", "Selinux configuration") as tc: diff --git a/kubemarine/procedures/install.py b/kubemarine/procedures/install.py index 88148ced0..914d063d2 100755 --- a/kubemarine/procedures/install.py +++ b/kubemarine/procedures/install.py @@ -15,20 +15,57 @@ from collections import OrderedDict +from typing import Callable + import fabric import yaml import os import io from kubemarine.core.action import Action +from kubemarine.core.cluster import KubernetesCluster from kubemarine.core.errors import KME from kubemarine import system, sysctl, haproxy, keepalived, kubernetes, plugins, \ kubernetes_accounts, selinux, thirdparties, admission, audit, coredns, cri, packages, apparmor from kubemarine.core import flow, utils from kubemarine.core.executor import RemoteExecutor +from kubemarine.core.group import NodeGroup from kubemarine.core.resources import DynamicResources +def _applicable_for_new_nodes_with_roles(*roles): + """ + Decorator to annotate installation methods. + If there are no new nodes with the specified roles to be added / installed to the cluster, + the decorator skips execution of the method. + Otherwise, it runs the annotated method with the calculated group of nodes with the specified roles. + Note that the signature of annotated method should be f(NodeGroup), + but the resulting wrapping method will be f(KubernetesCluster). + + :param roles: roles of nodes for which the annotated method is applicable. + :return: new wrapping method. + """ + if not roles: + raise Exception(f'Roles are not defined') + + def roles_wrapper(fn: Callable[[NodeGroup], None]): + def cluster_wrapper(cluster: KubernetesCluster): + candidate_group = cluster.nodes['all'].get_new_nodes_or_self() + group = cluster.make_group([]) + for role in roles: + group = group.include_group(cluster.nodes.get(role)) + group = group.intersection_group(candidate_group) + if not group.is_empty(): + fn(group) + else: + fn_name = fn.__module__ + '.' + fn.__qualname__ + cluster.log.debug(f"Skip running {fn_name} as no new node with roles {roles} has been found.") + + return cluster_wrapper + + return roles_wrapper + + def system_prepare_check_sudoer(cluster): not_sudoers = [] for host, node_context in cluster.context['nodes'].items(): @@ -42,11 +79,12 @@ def system_prepare_check_sudoer(cluster): raise KME("KME0005", hostnames=not_sudoers) -def system_prepare_check_system(cluster): - group = cluster.nodes['all'].get_new_nodes_or_self() +@_applicable_for_new_nodes_with_roles('all') +def system_prepare_check_system(group: NodeGroup): + cluster = group.cluster cluster.log.debug(system.fetch_os_versions(cluster)) for address, context in cluster.context["nodes"].items(): - if address not in group.nodes or not context.get('os'): + if address not in group.nodes: continue if context["os"]["family"] == "unsupported": raise Exception('%s host operating system is unsupported' % address) @@ -69,73 +107,85 @@ def system_prepare_check_cluster_installation(cluster): cluster.log.debug('There is no any installed cluster') -def system_prepare_system_chrony(cluster): +@_applicable_for_new_nodes_with_roles('all') +def system_prepare_system_chrony(group: NodeGroup): + cluster = group.cluster if cluster.inventory['services']['ntp'].get('chrony', {}).get('servers') is None: cluster.log.debug("Skipped - NTP servers from chrony is not defined in config file") return - cluster.nodes['all'].get_new_nodes_or_self().call(system.configure_chronyd) + group.call(system.configure_chronyd) -def system_prepare_system_timesyncd(cluster): +@_applicable_for_new_nodes_with_roles('all') +def system_prepare_system_timesyncd(group: NodeGroup): + cluster = group.cluster if not cluster.inventory['services']['ntp'].get('timesyncd', {}).get('Time', {}).get('NTP') and \ not cluster.inventory['services']['ntp'].get('timesyncd', {}).get('Time', {}).get('FallbackNTP'): cluster.log.debug("Skipped - NTP servers from timesyncd is not defined in config file") return - cluster.nodes['all'].get_new_nodes_or_self().call(system.configure_timesyncd) + group.call(system.configure_timesyncd) -def system_prepare_system_sysctl(cluster): +@_applicable_for_new_nodes_with_roles('all') +def system_prepare_system_sysctl(group: NodeGroup): + cluster = group.cluster if cluster.inventory['services'].get('sysctl') is None or not cluster.inventory['services']['sysctl']: cluster.log.debug("Skipped - sysctl is not defined or empty in config file") return - cluster.nodes['all'].get_new_nodes_or_self().call_batch([ + group.call_batch([ sysctl.configure, sysctl.reload, ]) -def system_prepare_system_setup_selinux(cluster): - cluster.nodes['all'].get_new_nodes_or_self().call(selinux.setup_selinux) +@_applicable_for_new_nodes_with_roles('all') +def system_prepare_system_setup_selinux(group: NodeGroup): + group.call(selinux.setup_selinux) -def system_prepare_system_setup_apparmor(cluster): - cluster.nodes['all'].get_new_nodes_or_self().call(apparmor.setup_apparmor) +@_applicable_for_new_nodes_with_roles('all') +def system_prepare_system_setup_apparmor(group: NodeGroup): + group.call(apparmor.setup_apparmor) -def system_prepare_system_disable_firewalld(cluster): - cluster.nodes['all'].get_new_nodes_or_self().call(system.disable_firewalld) +@_applicable_for_new_nodes_with_roles('all') +def system_prepare_system_disable_firewalld(group: NodeGroup): + group.call(system.disable_firewalld) -def system_prepare_system_disable_swap(cluster): - cluster.nodes['all'].get_new_nodes_or_self().call(system.disable_swap) +@_applicable_for_new_nodes_with_roles('all') +def system_prepare_system_disable_swap(group: NodeGroup): + group.call(system.disable_swap) -def system_prepare_system_modprobe(cluster): - cluster.nodes['all'].get_new_nodes_or_self().call(system.setup_modprobe) +@_applicable_for_new_nodes_with_roles('all') +def system_prepare_system_modprobe(group: NodeGroup): + group.call(system.setup_modprobe) -def system_install_audit(cluster): - group = cluster.nodes['control-plane'].include_group(cluster.nodes.get('worker')).get_new_nodes_or_self() - cluster.log.debug(group.call(audit.install)) +@_applicable_for_new_nodes_with_roles('control-plane', 'worker') +def system_install_audit(group: NodeGroup): + group.cluster.log.debug(group.call(audit.install)) -def system_prepare_audit_daemon(cluster): - group = cluster.nodes['control-plane'].include_group(cluster.nodes.get('worker')).get_new_nodes_or_self() - cluster.log.debug(group.call(audit.apply_audit_rules)) +@_applicable_for_new_nodes_with_roles('control-plane', 'worker') +def system_prepare_audit_daemon(group: NodeGroup): + group.cluster.log.debug(group.call(audit.apply_audit_rules)) -def system_prepare_policy(cluster): +@_applicable_for_new_nodes_with_roles('control-plane') +def system_prepare_policy(group: NodeGroup): """ Task generates rules for logging kubernetes and on audit """ - - audit_log_dir = os.path.dirname(cluster.inventory['services']['kubeadm']['apiServer']['extraArgs']['audit-log-path']) - audit_policy_dir = os.path.dirname(cluster.inventory['services']['kubeadm']['apiServer']['extraArgs']['audit-policy-file']) - cluster.nodes['control-plane'].run(f"sudo mkdir -p {audit_log_dir} && sudo mkdir -p {audit_policy_dir}") - audit_file_name = cluster.inventory['services']['kubeadm']['apiServer']['extraArgs']['audit-policy-file'] + cluster = group.cluster + api_server_extra_args = cluster.inventory['services']['kubeadm']['apiServer']['extraArgs'] + audit_log_dir = os.path.dirname(api_server_extra_args['audit-log-path']) + audit_file_name = api_server_extra_args['audit-policy-file'] + audit_policy_dir = os.path.dirname(audit_file_name) + group.sudo(f"mkdir -p {audit_log_dir} && sudo mkdir -p {audit_policy_dir}") policy_config = cluster.inventory['services']['audit'].get('cluster_policy') - collect_node = cluster.nodes['control-plane'].get_new_nodes_or_self()\ - .get_ordered_members_list(provide_node_configs=True) + collect_node = group.get_ordered_members_list(provide_node_configs=True) if policy_config: policy_config_file = yaml.dump(policy_config) @@ -143,13 +193,13 @@ def system_prepare_policy(cluster): #download rules in cluster for node in collect_node: node['connection'].put(io.StringIO(policy_config_file), audit_file_name, sudo=True, backup=True) - audit_config = True + audit_config = True cluster.log.debug("Audit cluster policy config") else: audit_config = False cluster.log.debug("Audit cluster policy config is empty, nothing will be configured ") - if kubernetes.is_cluster_installed(cluster) and audit_config == True and cluster.context['initial_procedure'] != 'add_node': + if kubernetes.is_cluster_installed(cluster) and audit_config is True and cluster.context['initial_procedure'] != 'add_node': for control_plane in collect_node: config_new = (kubernetes.get_kubeadm_config(cluster.inventory)) control_plane['connection'].put(io.StringIO(config_new), '/etc/kubernetes/audit-on-config.yaml', sudo=True) @@ -170,21 +220,22 @@ def system_prepare_policy(cluster): "--config=/etc/kubernetes/audit-on-config.yaml") - -def system_prepare_dns_hostname(cluster): +@_applicable_for_new_nodes_with_roles('all') +def system_prepare_dns_hostname(group: NodeGroup): + cluster = group.cluster with RemoteExecutor(cluster): - for node in cluster.nodes['all'].get_new_nodes_or_self().get_ordered_members_list(provide_node_configs=True): + for node in group.get_ordered_members_list(provide_node_configs=True): cluster.log.debug("Changing hostname '%s' = '%s'" % (node["connect_to"], node["name"])) node["connection"].sudo("hostnamectl set-hostname %s" % node["name"]) -def system_prepare_dns_resolv_conf(cluster): +@_applicable_for_new_nodes_with_roles('all') +def system_prepare_dns_resolv_conf(group: NodeGroup): + cluster = group.cluster if cluster.inventory["services"].get("resolv.conf") is None: cluster.log.debug("Skipped - resolv.conf section not defined in config file") return - group = cluster.nodes['all'].get_new_nodes_or_self() - system.update_resolv_conf(group, config=cluster.inventory["services"].get("resolv.conf")) cluster.log.debug(group.sudo("ls -la /etc/resolv.conf; sudo lsattr /etc/resolv.conf")) @@ -201,14 +252,14 @@ def system_prepare_dns_etc_hosts(cluster): cluster.log.debug(group.sudo("ls -la /etc/hosts")) -def system_prepare_package_manager_configure(cluster): +@_applicable_for_new_nodes_with_roles('all') +def system_prepare_package_manager_configure(group: NodeGroup): + cluster = group.cluster repositories = cluster.inventory['services']['packages']['package_manager'].get("repositories") if not repositories: cluster.log.debug("Skipped - no repositories defined for configuration") return - group = cluster.nodes['all'].get_new_nodes_or_self() - group.call_batch([ packages.backup_repo, packages.add_repo @@ -223,7 +274,9 @@ def system_prepare_package_manager_configure(cluster): cluster.log.debug(packages.ls_repofiles(group)) -def system_prepare_package_manager_manage_packages(cluster): +@_applicable_for_new_nodes_with_roles('all') +def system_prepare_package_manager_manage_packages(group: NodeGroup): + cluster = group.cluster if not cluster.inventory["services"].get("packages", {}): cluster.log.debug("Skipped - no packages configuration defined in config file") return @@ -253,7 +306,7 @@ def system_prepare_package_manager_manage_packages(cluster): } try: - batch_results = cluster.nodes['all'].get_new_nodes_or_self().call_batch(batch_tasks, **batch_parameters) + batch_results = group.call_batch(batch_tasks, **batch_parameters) except fabric.group.GroupException: cluster.log.verbose('Exception occurred! Trying to handle is there anything updated or not...') # todo develop cases when we can continue even if exception occurs @@ -274,51 +327,34 @@ def system_prepare_package_manager_manage_packages(cluster): cluster.log.verbose('No packages changed, nodes restart will not be scheduled') -def system_cri_install(cluster): +@_applicable_for_new_nodes_with_roles('control-plane', 'worker') +def system_cri_install(group: NodeGroup): """ Task which is used to install CRI. Could be skipped, if CRI already installed. """ - group = cluster.nodes['control-plane'].include_group(cluster.nodes.get('worker')) - - if cluster.context['initial_procedure'] == 'add_node': - group = group.get_new_nodes() - group.call(cri.install) -def system_cri_configure(cluster): +@_applicable_for_new_nodes_with_roles('control-plane', 'worker') +def system_cri_configure(group: NodeGroup): """ Task which is used to configure CRI. Could be skipped, if CRI already configured. """ - group = cluster.nodes['control-plane'].include_group(cluster.nodes.get('worker')) - - if cluster.context['initial_procedure'] == 'add_node': - group = group.get_new_nodes() - group.call(cri.configure) -def system_prepare_thirdparties(cluster): +@_applicable_for_new_nodes_with_roles('all') +def system_prepare_thirdparties(group: NodeGroup): + cluster = group.cluster if not cluster.inventory['services'].get('thirdparties', {}): cluster.log.debug("Skipped - no thirdparties defined in config file") return - cluster.nodes['all'].get_new_nodes_or_self().call(thirdparties.install_all_thirparties) - - -def deploy_loadbalancer_haproxy_install(cluster): - group = None - if "balancer" in cluster.nodes: - - group = cluster.nodes['balancer'] + group.call(thirdparties.install_all_thirparties) - if cluster.context['initial_procedure'] == 'add_node': - group = cluster.nodes['balancer'].get_new_nodes() - - if group is None or group.is_empty(): - cluster.log.debug('Skipped - no balancers to perform') - return +@_applicable_for_new_nodes_with_roles('balancer') +def deploy_loadbalancer_haproxy_install(group: NodeGroup): group.call(haproxy.install) @@ -362,6 +398,8 @@ def deploy_loadbalancer_keepalived_install(cluster): group = cluster.nodes['keepalived'].get_new_nodes() # if balancer added or removed - reconfigure all keepalives + # todo The method is currently not invoked for remove node. + # So why we try to install all keepalives for add_node but not touch them for remove_node? if not cluster.nodes['balancer'].get_changed_nodes().is_empty(): group = cluster.nodes['keepalived'].get_final_nodes() @@ -395,66 +433,40 @@ def deploy_loadbalancer_keepalived_configure(cluster): group.call(keepalived.configure) -def deploy_kubernetes_reset(cluster): - group = cluster.nodes['control-plane'].include_group(cluster.nodes.get('worker')) - - if cluster.context['initial_procedure'] == 'add_node' and group.get_new_nodes().is_empty(): - cluster.log.debug("No kubernetes nodes to perform") - return +@_applicable_for_new_nodes_with_roles('control-plane', 'worker') +def deploy_kubernetes_reset(group: NodeGroup): + group.call(kubernetes.reset_installation_env) - group.get_new_nodes_or_self().call(kubernetes.reset_installation_env) - - -def deploy_kubernetes_install(cluster): - cluster.log.debug("Setting up Kubernetes...") - - group = cluster.nodes['control-plane'].include_group(cluster.nodes.get('worker')) - - if cluster.context['initial_procedure'] == 'add_node' and group.get_new_nodes().is_empty(): - cluster.log.debug("No kubernetes nodes to perform") - return - group.get_new_nodes_or_self().call(kubernetes.install) +@_applicable_for_new_nodes_with_roles('control-plane', 'worker') +def deploy_kubernetes_install(group: NodeGroup): + group.cluster.log.debug("Setting up Kubernetes...") + group.call(kubernetes.install) - - -def deploy_kubernetes_prepull_images(cluster): - cluster.log.debug("Prepulling Kubernetes images...") - - group = cluster.nodes['control-plane'].include_group(cluster.nodes.get('worker')) - - if cluster.context['initial_procedure'] == 'add_node' and group.get_new_nodes().is_empty(): - cluster.log.debug("No kubernetes nodes to perform") - return - - group.get_new_nodes_or_self().call(kubernetes.images_grouped_prepull) +@_applicable_for_new_nodes_with_roles('control-plane', 'worker') +def deploy_kubernetes_prepull_images(group: NodeGroup): + group.cluster.log.debug("Prepulling Kubernetes images...") + group.call(kubernetes.images_grouped_prepull) def deploy_kubernetes_init(cluster): - group = cluster.nodes['control-plane'].include_group(cluster.nodes.get('worker')) - - if cluster.context['initial_procedure'] == 'add_node' and group.get_new_nodes().is_empty(): - cluster.log.debug("No kubernetes nodes for installation") - return - - cluster.nodes['control-plane'].get_new_nodes_or_self().call_batch([ + cluster.nodes['control-plane'].call_batch([ kubernetes.init_first_control_plane, kubernetes.join_other_control_planes ]) if 'worker' in cluster.nodes: - cluster.nodes.get('worker').get_new_nodes_or_self().new_group( + cluster.nodes.get('worker').new_group( apply_filter=lambda node: 'control-plane' not in node['roles']) \ .call(kubernetes.init_workers) - cluster.nodes['all'].get_new_nodes_or_self().call_batch([ + cluster.nodes['all'].call_batch([ kubernetes.apply_labels, kubernetes.apply_taints ]) - def deploy_coredns(cluster): config = coredns.generate_configmap(cluster.inventory) diff --git a/kubemarine/procedures/migrate_cri.py b/kubemarine/procedures/migrate_cri.py index d1b7198f8..51d1466a1 100755 --- a/kubemarine/procedures/migrate_cri.py +++ b/kubemarine/procedures/migrate_cri.py @@ -22,6 +22,7 @@ from kubemarine import kubernetes, etcd, thirdparties, cri, plugins from kubemarine.core import flow, utils from kubemarine.core.action import Action +from kubemarine.core.cluster import KubernetesCluster from kubemarine.core.resources import DynamicResources from kubemarine.cri import docker from kubemarine.procedures import install @@ -32,6 +33,12 @@ def enrich_inventory(inventory, cluster): if cluster.context.get("initial_procedure") != "migrate_cri": return inventory + + os_family = cluster.get_os_family() + if os_family in ('unknown', 'unsupported', 'multiple'): + raise Exception("Migration of CRI is possible only for cluster " + "with all nodes having the same and supported OS family") + enrichment_functions = [ _prepare_yum_repos, _prepare_packages, @@ -44,7 +51,7 @@ def enrich_inventory(inventory, cluster): return inventory -def _prepare_yum_repos(cluster, inventory): +def _prepare_yum_repos(cluster: KubernetesCluster, inventory: dict, finalization=False): if not cluster.procedure_inventory.get("yum", {}): cluster.log.debug("Skipped - no yum section defined in procedure config file") return inventory @@ -65,7 +72,7 @@ def _prepare_yum_repos(cluster, inventory): return inventory -def _prepare_packages(cluster, inventory): +def _prepare_packages(cluster: KubernetesCluster, inventory: dict, finalization=False): if not cluster.procedure_inventory.get("packages", {}): cluster.log.debug("Skipped - no packages defined in procedure config file") return inventory @@ -74,18 +81,24 @@ def _prepare_packages(cluster, inventory): cluster.log.debug("Skipped - no associations defined in procedure config file") return inventory - if not inventory["services"].get("packages", {}): - inventory["services"]["packages"] = {} - - if inventory["services"]["packages"].get("associations", {}): + if finalization: + # Despite we enrich OS specific section inside system.enrich_upgrade_inventory, + # we still merge global associations section because it has priority during enrichment. + inventory["services"].setdefault("packages", {}).setdefault("associations", {}) default_merger.merge(inventory["services"]["packages"]["associations"], cluster.procedure_inventory["packages"]["associations"]) else: - inventory["services"]["packages"]["associations"] = cluster.procedure_inventory["packages"]["associations"] + # Merge OS family specific section. It is already enriched in packages.enrich_inventory_associations + # This effectively allows to specify only global section but not for specific OS family. + # This restriction is because system.enrich_upgrade_inventory goes after packages.enrich_inventory_associations, + # but in future the restriction can be eliminated. + default_merger.merge(inventory["services"]["packages"]["associations"][cluster.get_os_family()], + cluster.procedure_inventory["packages"]["associations"]) + return inventory -def _prepare_crictl(cluster, inventory): +def _prepare_crictl(cluster: KubernetesCluster, inventory: dict, finalization=False): if cluster.procedure_inventory.get("thirdparties", {}) \ and cluster.procedure_inventory["thirdparties"].get("/usr/bin/crictl.tar.gz", {}): @@ -100,7 +113,7 @@ def _prepare_crictl(cluster, inventory): return inventory -def _configure_containerd_on_nodes(cluster, inventory): +def _configure_containerd_on_nodes(cluster: KubernetesCluster, inventory: dict): if "cri" not in cluster.procedure_inventory or "containerRuntime" not in cluster.procedure_inventory["cri"]: raise Exception("Please specify mandatory parameter cri.containerRuntime in procedure.yaml") @@ -114,7 +127,7 @@ def _configure_containerd_on_nodes(cluster, inventory): return inventory -def _merge_containerd(cluster, inventory): +def _merge_containerd(cluster, inventory, finalization=False): if not inventory["services"].get("cri", {}): inventory["services"]["cri"] = {} @@ -131,7 +144,7 @@ def migrate_cri(cluster): _migrate_cri(cluster, cluster.nodes["control-plane"].get_ordered_members_list(provide_node_configs=True)) -def _migrate_cri(cluster, node_group): +def _migrate_cri(cluster: KubernetesCluster, node_group: dict): """ Migrate CRI from docker to already installed containerd. This method works node-by-node, configuring kubelet to use containerd. @@ -179,7 +192,7 @@ def _migrate_cri(cluster, node_group): node["connection"].sudo("systemctl stop kubelet") docker.prune(node["connection"]) - docker_associations = cluster.get_associations_for_node(node['connect_to'])['docker'] + docker_associations = cluster.get_associations_for_node(node['connect_to'], 'docker') node["connection"].sudo(f"systemctl disable {docker_associations['service_name']} --now; " "sudo sh -c 'rm -rf /var/lib/docker/*'") @@ -288,7 +301,7 @@ def migrate_cri_finalize_inventory(cluster, inventory_to_finalize): ] for finalize_fn in finalize_functions: cluster.log.verbose('Calling fn "%s"' % finalize_fn.__qualname__) - inventory_to_finalize = finalize_fn(cluster, inventory_to_finalize) + inventory_to_finalize = finalize_fn(cluster, inventory_to_finalize, finalization=True) return inventory_to_finalize diff --git a/kubemarine/procedures/upgrade.py b/kubemarine/procedures/upgrade.py index fd4da2729..42cb314b8 100755 --- a/kubemarine/procedures/upgrade.py +++ b/kubemarine/procedures/upgrade.py @@ -22,6 +22,7 @@ from distutils.util import strtobool from kubemarine.core.action import Action +from kubemarine.core.cluster import KubernetesCluster from kubemarine.core.resources import DynamicResources from kubemarine.core.yaml_merger import default_merger from kubemarine.core import flow @@ -106,7 +107,7 @@ def upgrade_plugins(cluster): plugins.install(cluster, upgrade_candidates) -def upgrade_containerd(cluster): +def upgrade_containerd(cluster: KubernetesCluster): """ This function fixes the incorrect version of pause during the cluster update procedure """ @@ -145,7 +146,7 @@ def upgrade_containerd(cluster): with RemoteExecutor(cluster) as exe: for node in cluster.nodes['control-plane'].include_group(cluster.nodes.get('worker')).get_ordered_members_list( provide_node_configs=True): - os_specific_associations = cluster.get_associations_for_node(node['connect_to'])['containerd'] + os_specific_associations = cluster.get_associations_for_node(node['connect_to'], 'containerd') node['connection'].put(StringIO(config_string), os_specific_associations['config_location'], backup=True, sudo=True, mkdir=True) @@ -174,9 +175,7 @@ def upgrade_finalize_inventory(cluster, inventory): return inventory upgrade_version = cluster.context.get("upgrade_version") - if not inventory['services'].get('kubeadm'): - inventory['services']['kubeadm'] = {} - inventory['services']['kubeadm']['kubernetesVersion'] = upgrade_version + inventory.setdefault("services", {}).setdefault("kubeadm", {})['kubernetesVersion'] = upgrade_version # if thirdparties was not defined in procedure.yaml, # then no need to forcibly place them: user may want to use default @@ -184,16 +183,14 @@ def upgrade_finalize_inventory(cluster, inventory): inventory['services']['thirdparties'] = cluster.procedure_inventory[upgrade_version]['thirdparties'] if cluster.procedure_inventory.get(upgrade_version, {}).get("plugins"): - if not inventory.get("plugins"): - inventory["plugins"] = {} + inventory.setdefault("plugins", {}) default_merger.merge(inventory["plugins"], cluster.procedure_inventory[upgrade_version]["plugins"]) if cluster.procedure_inventory.get(upgrade_version, {}).get("packages"): - if not inventory.get("services"): - inventory["services"] = {} - if not inventory["services"].get("packages"): - inventory["services"]["packages"] = {} + inventory['services'].setdefault("packages", {}) packages = cluster.procedure_inventory[upgrade_version]["packages"] + # Despite we enrich OS specific section inside system.enrich_upgrade_inventory, + # we still merge global associations section because it has priority during enrichment. default_merger.merge(inventory["services"]["packages"], packages) return inventory diff --git a/kubemarine/selinux.py b/kubemarine/selinux.py index ec1fbd6c2..0f9c38f32 100644 --- a/kubemarine/selinux.py +++ b/kubemarine/selinux.py @@ -139,7 +139,7 @@ def get_selinux_status(group): def is_config_valid(group, state=None, policy=None, permissive=None): log = group.cluster.log - if group.get_nodes_os(suppress_exceptions=True) == 'debian': + if group.get_nodes_os() == 'debian': log.debug("Skipped - selinux is not supported on Ubuntu/Debian os family") return @@ -192,7 +192,7 @@ def setup_selinux(group): log = group.cluster.log # this method handles cluster with multiple os, suppressing should be enabled - if group.get_nodes_os(suppress_exceptions=True) not in ['rhel', 'rhel8']: + if group.get_nodes_os() not in ['rhel', 'rhel8']: log.debug("Skipped - selinux is not supported on Ubuntu/Debian os family") return diff --git a/kubemarine/system.py b/kubemarine/system.py index 35a6dc7a0..02fb050c3 100644 --- a/kubemarine/system.py +++ b/kubemarine/system.py @@ -101,16 +101,21 @@ def enrich_inventory(inventory, cluster): return inventory -def enrich_upgrade_inventory(inventory, cluster): +def enrich_upgrade_inventory(inventory: dict, cluster: KubernetesCluster): if cluster.context.get("initial_procedure") != "upgrade": return inventory + os_family = cluster.get_os_family() + if os_family in ('unknown', 'unsupported', 'multiple'): + raise Exception("Upgrade is possible only for cluster " + "with all nodes having the same and supported OS family") + # validate all packages sections in procedure inventory with open(utils.get_resource_absolute_path('resources/configurations/defaults.yaml', script_relative=True), 'r') \ as stream: - base_associations = yaml.safe_load(stream)["services"]["packages"]["associations"][get_os_family(cluster)] + base_associations = yaml.safe_load(stream)["services"]["packages"]["associations"][os_family] - cluster_associations = deepcopy(cluster.inventory["services"]["packages"]["associations"]) + cluster_associations = deepcopy(inventory["services"]["packages"]["associations"][os_family]) previous_ver = cluster.context["initial_kubernetes_version"] upgrade_plan = cluster.procedure_inventory.get('upgrade_plan') for version in upgrade_plan: @@ -130,9 +135,16 @@ def enrich_upgrade_inventory(inventory, cluster): cluster.context["packages"] = {"upgrade_required": upgrade_required} upgrade_ver = cluster.context["upgrade_version"] - packages_section = cluster.procedure_inventory.get(upgrade_ver, {}).get("packages") - if packages_section: - default_merger.merge(inventory["services"]["packages"], packages_section) + packages_section = deepcopy(cluster.procedure_inventory.get(upgrade_ver, {}).get("packages", {})) + # Move associations to the OS family specific section, and then merge with associations from procedure. + # This effectively allows to specify only global section but not for specific OS family. + # This restriction is because system.enrich_upgrade_inventory goes after packages.enrich_inventory_associations, + # but in future the restriction can be eliminated. + associations = packages_section.pop("associations", {}) + default_merger.merge(inventory["services"]["packages"]["associations"][os_family], associations) + + # merge remained packages section + default_merger.merge(inventory["services"]["packages"], packages_section) return inventory @@ -143,7 +155,7 @@ def get_system_packages_for_upgrade(cluster): compatibility = cluster.globals["compatibility_map"]["software"] # handle special cases in which upgrade is not required for particular package - cluster_associations = cluster.inventory["services"]["packages"]["associations"] + cluster_associations = cluster.inventory["services"]["packages"]["associations"][cluster.get_os_family()] upgrade_associations = cluster.procedure_inventory.get(upgrade_ver, {}).get("packages", {}).get("associations", {}) system_packages = get_system_packages(cluster) upgrade_required = list(system_packages) @@ -183,7 +195,6 @@ def detect_os_family(cluster): stdout = result.stdout.lower() version = None - versions = [] lines = '' version_regex = re.compile("\\s\\d*\\.\\d*", re.M) @@ -208,18 +219,14 @@ def detect_os_family(cluster): cluster.log.debug("Distribution: %s; Version: %s" % (name, version)) + os_family = 'unsupported' if name in cluster.globals["compatibility_map"]["distributives"]: + os_family = 'unknown' os_family_list = cluster.globals["compatibility_map"]["distributives"][name] for os_family_item in os_family_list: - versions.extend(os_family_item["versions"]) - if version in versions: + if version in os_family_item["versions"]: os_family = os_family_item["os_family"] - versions = [] break - else: - os_family = 'unknown' - else: - os_family = 'unsupported' cluster.log.debug("OS family: %s" % os_family) @@ -229,19 +236,6 @@ def detect_os_family(cluster): 'family': os_family } - cluster.context["os"] = cluster.nodes["all"].get_accessible_nodes().get_nodes_os(suppress_exceptions=True) - - -def get_os_family(cluster: KubernetesCluster) -> str: - """ - Returns common OS family name from remote hosts. - :param cluster: Cluster object where OS family is detected. - :return: Detected OS family, possible values: "debian", "rhel", "rhel8", "multiple", "unknown". - """ - # OS family is always not None, - # because it is either detected during cluster initialization, or there are no accessible nodes to detect it. - return cluster.context.get("os") - def get_compatibility_version_key(cluster: KubernetesCluster) -> str or None: """ @@ -253,7 +247,7 @@ def get_compatibility_version_key(cluster: KubernetesCluster) -> str or None: Return os-specific version compatibility key. If OS is unknown or multiple OS, then returns None. """ - os = get_os_family(cluster) + os = cluster.get_os_family() if os == "rhel": return "version_rhel" elif os == "rhel8": @@ -264,10 +258,6 @@ def get_compatibility_version_key(cluster: KubernetesCluster) -> str or None: return None -def is_multiple_os_detected(cluster): - return get_os_family(cluster) == 'multiple' - - def update_resolv_conf(group, config=None): if config is None: raise Exception("Data can't be empty") @@ -631,7 +621,7 @@ def is_modprobe_valid(group): def verify_system(group): log = group.cluster.log # this method handles clusters with multiple is, suppress exceptions enabled - os_family = group.get_nodes_os(suppress_exceptions=True) + os_family = group.get_nodes_os() if os_family in ['rhel', 'rhel8'] and group.cluster.is_task_completed('prepare.system.setup_selinux'): log.debug("Verifying Selinux...") diff --git a/test/unit/core/test_cluster.py b/test/unit/core/test_cluster.py index 51b754857..27675aecd 100644 --- a/test/unit/core/test_cluster.py +++ b/test/unit/core/test_cluster.py @@ -17,6 +17,11 @@ import unittest from kubemarine import demo +from kubemarine.demo import FakeKubernetesCluster + + +def get_os_family(cluster: FakeKubernetesCluster): + return cluster.get_os_family() class KubernetesClusterTest(unittest.TestCase): @@ -60,3 +65,46 @@ def test_make_group_from_mixed_types(self): ]) self.assertEqual(self.cluster.nodes['all'], actual_group, msg="Created group is not equivalent to all nodes group") + + def test_get_os_family(self): + cluster = demo.new_cluster(demo.generate_inventory(**demo.MINIHA_KEEPALIVED)) + self.assertEqual('rhel', get_os_family(cluster), + msg="Demo cluster should be created with 'rhel' OS family by default") + + def test_get_os_family_multiple(self): + inventory = demo.generate_inventory(**demo.MINIHA_KEEPALIVED) + context = demo.create_silent_context() + host_different_os = inventory['nodes'][0]['address'] + context['nodes'] = self._nodes_context_one_different_os(inventory, host_different_os) + cluster = demo.new_cluster(inventory, context=context) + self.assertEqual('multiple', get_os_family(cluster), + msg="One node has different OS family and thus global OS family should be 'multiple'") + + def test_add_node_different_os_get_os_family_multiple(self): + inventory = demo.generate_inventory(**demo.MINIHA_KEEPALIVED) + context = demo.create_silent_context(procedure='add_node') + host_different_os = inventory['nodes'][0]['address'] + context['nodes'] = self._nodes_context_one_different_os(inventory, host_different_os) + add_node = {'nodes': [inventory['nodes'].pop(0)]} + cluster = demo.new_cluster(inventory, procedure_inventory=add_node, context=context) + self.assertEqual('multiple', get_os_family(cluster), + msg="One node has different OS family and thus global OS family should be 'multiple'") + + def test_remove_node_different_os_get_os_family_single(self): + inventory = demo.generate_inventory(**demo.MINIHA_KEEPALIVED) + context = demo.create_silent_context(procedure='remove_node') + host_different_os = inventory['nodes'][0]['address'] + context['nodes'] = self._nodes_context_one_different_os(inventory, host_different_os) + remove_node = {"nodes": [{"name": inventory["nodes"][0]["name"]}]} + cluster = demo.new_cluster(inventory, procedure_inventory=remove_node, context=context) + self.assertEqual('debian', get_os_family(cluster), + msg="One node has different OS family and thus global OS family should be 'multiple'") + + def _nodes_context_one_different_os(self, inventory, host_different_os): + nodes_context = demo.generate_nodes_context(inventory, os_name='ubuntu', os_version='20.04') + nodes_context[host_different_os]['os'] = { + 'name': 'centos', + 'family': 'rhel', + 'version': '7.9' + } + return nodes_context diff --git a/test/unit/core/test_flow.py b/test/unit/core/test_flow.py index b42ebb8b5..5642cb0c6 100755 --- a/test/unit/core/test_flow.py +++ b/test/unit/core/test_flow.py @@ -193,8 +193,8 @@ def test_run_flow(self): deploy.loadbalancer.haproxy, deploy.loadbalancer.keepalived, deploy.accounts, overview.") def test_run_tasks(self): - context = demo.create_silent_context(flow.new_tasks_flow_parser("Help text"), - ['--tasks', 'deploy.loadbalancer.haproxy']) + context = demo.create_silent_context(['--tasks', 'deploy.loadbalancer.haproxy'], + parser=flow.new_tasks_flow_parser("Help text")) inventory = demo.generate_inventory(**demo.FULLHA) cluster = demo.new_cluster(inventory, context=context) flow.run_tasks(demo.FakeResources(context, inventory, cluster=cluster), tasks) @@ -205,7 +205,7 @@ def test_detect_nodes_context(self): inventory = demo.generate_inventory(**demo.FULLHA) hosts = [node["address"] for node in inventory["nodes"]] self._stub_detect_nodes_context(inventory, hosts, hosts) - context = demo.create_silent_context(flow.new_tasks_flow_parser("Help text"), []) + context = demo.create_silent_context([], parser=flow.new_tasks_flow_parser("Help text")) res = demo.FakeResources(context, inventory, fake_shell=self.light_fake_shell) # not throws any exception during cluster initialization flow.run_tasks(res, tasks) @@ -213,7 +213,7 @@ def test_detect_nodes_context(self): self.assertEqual(4, cluster.context["test_info"], "Here should be all 4 calls of test_func") - self.assertEqual("rhel", cluster.context["os"]) + self.assertEqual("rhel", cluster.get_os_family()) for host, node_context in cluster.context["nodes"].items(): self.assertEqual({'online': True, 'accessible': True, 'sudo': 'Root'}, node_context["access"]) self.assertEqual({'name': 'centos', 'version': '7.6', 'family': 'rhel'}, node_context["os"]) @@ -223,14 +223,14 @@ def test_not_sudoer_does_not_interrupt_enrichment(self): inventory = demo.generate_inventory(**demo.FULLHA) hosts = [node["address"] for node in inventory["nodes"]] self._stub_detect_nodes_context(inventory, hosts, []) - context = demo.create_silent_context(flow.new_tasks_flow_parser("Help text"), []) + context = demo.create_silent_context([], parser=flow.new_tasks_flow_parser("Help text")) res = demo.FakeResources(context, inventory, fake_shell=self.light_fake_shell) flow.run_tasks(res, tasks) cluster = res.cluster() self.assertEqual(4, cluster.context["test_info"], "Here should be all 4 calls of test_func") - self.assertEqual("rhel", cluster.context["os"]) + self.assertEqual("rhel", cluster.get_os_family()) for host, node_context in cluster.context["nodes"].items(): self.assertEqual({'online': True, 'accessible': True, 'sudo': 'No'}, node_context["access"]) # continue to collect info @@ -242,7 +242,7 @@ def test_any_offline_node_interrupts(self): online_hosts = [node["address"] for node in inventory["nodes"]] offline = online_hosts.pop(random.randrange(len(online_hosts))) self._stub_detect_nodes_context(inventory, online_hosts, []) - context = demo.create_silent_context(flow.new_tasks_flow_parser("Help text"), []) + context = demo.create_silent_context([], parser=flow.new_tasks_flow_parser("Help text")) res = demo.FakeResources(context, inventory, fake_shell=self.light_fake_shell) exc = None @@ -265,8 +265,8 @@ def test_removed_node_can_be_offline(self): procedure_inventory = {"nodes": [{"name": inventory["nodes"][masters[i]]["name"]}]} self._stub_detect_nodes_context(inventory, online_hosts, []) - context = demo.create_silent_context(flow.new_procedure_parser("Help text"), ['fake_path.yaml'], - procedure='remove_node') + context = demo.create_silent_context(['fake_path.yaml'], procedure='remove_node', + parser=flow.new_procedure_parser("Help text")) res = demo.FakeResources(context, inventory, procedure_inventory=procedure_inventory, fake_shell=self.light_fake_shell) diff --git a/test/unit/core/test_run_actions.py b/test/unit/core/test_run_actions.py index d6e8c1f5b..f18d07a54 100644 --- a/test/unit/core/test_run_actions.py +++ b/test/unit/core/test_run_actions.py @@ -31,7 +31,7 @@ def recreate_inventory(self): class RunActionsTest(unittest.TestCase): def setUp(self) -> None: - self.context = demo.create_silent_context(flow.new_common_parser("Help text"), []) + self.context = demo.create_silent_context() self.context['preserve_inventory'] = True self.inventory = demo.generate_inventory(**demo.FULLHA) self.cluster: demo.FakeKubernetesCluster = demo.new_cluster(self.inventory, context=self.context) diff --git a/test/unit/test_audit.py b/test/unit/test_audit.py index af0399956..97ad6ad97 100644 --- a/test/unit/test_audit.py +++ b/test/unit/test_audit.py @@ -20,6 +20,7 @@ from kubemarine import demo, audit from kubemarine.core.group import NodeGroupResult +from kubemarine.demo import FakeKubernetesCluster class NodeGroupResultsTest(unittest.TestCase): @@ -28,13 +29,20 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.inventory = demo.generate_inventory(**demo.FULLHA) + def new_debian_cluster(self) -> FakeKubernetesCluster: + context = demo.create_silent_context() + context['nodes'] = demo.generate_nodes_context(self.inventory, os_name='ubuntu', os_version='20.04') + return demo.new_cluster(self.inventory, context=context) + def test_audit_installation_for_centos(self): - cluster = demo.new_cluster(self.inventory, os_name='centos', os_version='7.9') + context = demo.create_silent_context() + context['nodes'] = demo.generate_nodes_context(self.inventory, os_name='centos', os_version='7.9') + cluster = demo.new_cluster(self.inventory, context=context) audit.install(cluster.nodes['master']) def test_audit_installation_for_debian(self): - cluster = demo.new_cluster(self.inventory, os_name='ubuntu', os_version='20.04') - package_associations = cluster.inventory['services']['packages']['associations']['audit'] + cluster = self.new_debian_cluster() + package_associations = cluster.inventory['services']['packages']['associations']['debian']['audit'] package_name = package_associations['package_name'] service_name = package_associations['service_name'] @@ -60,8 +68,8 @@ def test_audit_installation_for_debian(self): audit.install(cluster.nodes['master']) def test_audit_installation_when_already_installed_for_debian(self): - cluster = demo.new_cluster(self.inventory, os_name='ubuntu', os_version='20.04') - package_associations = cluster.inventory['services']['packages']['associations']['audit'] + cluster = self.new_debian_cluster() + package_associations = cluster.inventory['services']['packages']['associations']['debian']['audit'] package_name = package_associations['package_name'] @@ -74,9 +82,9 @@ def test_audit_installation_when_already_installed_for_debian(self): audit.install(cluster.nodes['master']) def test_audit_installation_when_partly_installed_for_debian(self): - cluster = demo.new_cluster(self.inventory, os_name='ubuntu', os_version='20.04') + cluster = self.new_debian_cluster() all_nodes_group = cluster.nodes['all'].nodes - package_associations = cluster.inventory['services']['packages']['associations']['audit'] + package_associations = cluster.inventory['services']['packages']['associations']['debian']['audit'] package_name = package_associations['package_name'] service_name = package_associations['service_name'] @@ -118,8 +126,8 @@ def test_audit_installation_when_partly_installed_for_debian(self): msg="Installation task did not finished with audit enable command") def test_audit_configuring(self): - cluster = demo.new_cluster(self.inventory, os_name='ubuntu', os_version='20.04') - package_associations = cluster.inventory['services']['packages']['associations']['audit'] + cluster = self.new_debian_cluster() + package_associations = cluster.inventory['services']['packages']['associations']['debian']['audit'] package_name = package_associations['package_name'] config_location = package_associations['config_location'] diff --git a/test/unit/test_haproxy.py b/test/unit/test_haproxy.py index 0286bdb5a..18ad95a5f 100755 --- a/test/unit/test_haproxy.py +++ b/test/unit/test_haproxy.py @@ -68,7 +68,7 @@ def test_haproxy_installation_when_already_installed(self): inventory = demo.generate_inventory(**demo.FULLHA) cluster = demo.new_cluster(inventory) - package_associations = cluster.inventory['services']['packages']['associations']['haproxy'] + package_associations = cluster.inventory['services']['packages']['associations']['rhel']['haproxy'] # simulate already installed haproxy package expected_results_1 = demo.create_nodegroup_result(cluster.nodes['balancer'], stdout='Haproxy v1.2.3') @@ -98,7 +98,7 @@ def test_haproxy_installation_when_not_installed(self): inventory = demo.generate_inventory(**demo.FULLHA) cluster = demo.new_cluster(inventory) - package_associations = cluster.inventory['services']['packages']['associations']['haproxy'] + package_associations = cluster.inventory['services']['packages']['associations']['rhel']['haproxy'] # simulate haproxy package missing missing_package_command = ['%s -v' % package_associations['executable_name']] diff --git a/test/unit/test_keepalived.py b/test/unit/test_keepalived.py index 8da244a15..f85dba54c 100755 --- a/test/unit/test_keepalived.py +++ b/test/unit/test_keepalived.py @@ -140,7 +140,7 @@ def test_keepalived_installation_when_already_installed(self): inventory = demo.generate_inventory(**demo.FULLHA_KEEPALIVED) cluster = demo.new_cluster(inventory) - package_associations = cluster.inventory['services']['packages']['associations']['keepalived'] + package_associations = cluster.inventory['services']['packages']['associations']['rhel']['keepalived'] # simulate already installed keepalived package expected_results_1 = demo.create_nodegroup_result(cluster.nodes['keepalived'], stdout='Keepalived v1.2.3') @@ -174,7 +174,7 @@ def test_keepalived_installation_when_not_installed(self): inventory = demo.generate_inventory(**demo.FULLHA_KEEPALIVED) cluster = demo.new_cluster(inventory) - package_associations = cluster.inventory['services']['packages']['associations']['keepalived'] + package_associations = cluster.inventory['services']['packages']['associations']['rhel']['keepalived'] # simulate keepalived package missing missing_package_command = ['%s -v' % package_associations['executable_name']] @@ -239,7 +239,7 @@ def test_config_apply(self): node = cluster.nodes['keepalived'].get_first_member(provide_node_configs=True) expected_config = keepalived.generate_config(cluster.inventory, node) - package_associations = cluster.inventory['services']['packages']['associations']['keepalived'] + package_associations = cluster.inventory['services']['packages']['associations']['rhel']['keepalived'] configs_directory = '/'.join(package_associations['config_location'].split('/')[:-1]) # simulate mkdir for configs diff --git a/test/unit/test_migrate_cri.py b/test/unit/test_migrate_cri.py new file mode 100644 index 000000000..df21d749a --- /dev/null +++ b/test/unit/test_migrate_cri.py @@ -0,0 +1,59 @@ +# Copyright 2021-2022 NetCracker Technology Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from copy import deepcopy + +from kubemarine import demo +from kubemarine.core import utils + + +def generate_migrate_cri_environment() -> (dict, dict): + inventory = demo.generate_inventory(**demo.MINIHA_KEEPALIVED) + inventory['services']['cri'] = { + 'containerRuntime': 'docker' + } + context = demo.create_silent_context(procedure='migrate_cri') + return inventory, context + + +class MigrateCriPackagesEnrichment(unittest.TestCase): + def prepare_procedure_inventory(self): + return { + 'cri': { + 'containerRuntime': 'containerd' + }, + 'packages': { + 'associations': { + 'containerd': { + 'package_name': 'containerd' + } + } + } + } + + def test_enrich_packages_propagate_associations(self): + inventory, context = generate_migrate_cri_environment() + migrate_cri = self.prepare_procedure_inventory() + cluster = demo.new_cluster(inventory, procedure_inventory=migrate_cri, context=context) + self.assertEqual('containerd', cluster.inventory['services']['packages']['associations']['rhel']['containerd']['package_name'], + "Associations packages are enriched incorrectly") + + def test_final_inventory_enrich_global(self): + inventory, context = generate_migrate_cri_environment() + migrate_cri = self.prepare_procedure_inventory() + cluster = demo.new_cluster(deepcopy(inventory), procedure_inventory=deepcopy(migrate_cri), context=context) + final_inventory = utils.get_final_inventory(cluster, inventory) + self.assertEqual(migrate_cri['packages'], final_inventory['services']['packages'], + "Final inventory is recreated incorrectly") diff --git a/test/unit/test_packages.py b/test/unit/test_packages.py new file mode 100644 index 000000000..ca9e363e2 --- /dev/null +++ b/test/unit/test_packages.py @@ -0,0 +1,452 @@ +# Copyright 2021-2022 NetCracker Technology Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from copy import deepcopy +from typing import Optional, Dict + +from kubemarine import demo, packages +from kubemarine.core import static, defaults, log +from kubemarine.demo import FakeKubernetesCluster +from kubemarine.procedures import add_node + + +def new_debian_cluster(inventory: dict) -> FakeKubernetesCluster: + context = demo.create_silent_context() + context['nodes'] = demo.generate_nodes_context(inventory, os_name='ubuntu', os_version='20.04') + return demo.new_cluster(inventory, context=context) + + +def prepare_compiled_associations_defaults() -> dict: + defs = deepcopy(static.DEFAULTS) + defs['cluster_name'] = 'k8s.fake.local' + context = demo.create_silent_context() + logger = log.init_log_from_context_args(static.GLOBALS, context, defs).logger + + root = deepcopy(defs) + root['globals'] = static.GLOBALS + return defaults.compile_object(logger, defs['services']['packages']['associations'], root) + + +COMPILED_ASSOCIATIONS_DEFAULTS = prepare_compiled_associations_defaults() + + +def get_compiled_defaults(): + return deepcopy(COMPILED_ASSOCIATIONS_DEFAULTS) + + +def global_associations(inventory: dict) -> dict: + return inventory.setdefault('services', {}).setdefault('packages', {}).setdefault('associations', {}) + + +def os_family_associations(inventory: dict, os_family: str) -> dict: + return global_associations(inventory).setdefault(os_family, {}) + + +def package_associations(inventory: dict, os_family: Optional[str], package: str) -> dict: + return (os_family_associations(inventory, os_family) if os_family else global_associations(inventory))\ + .setdefault(package, {}) + + +def set_cache_versions_false(inventory: dict, os_family: Optional[str], package: Optional[str]): + section = inventory.setdefault('services', {}).setdefault('packages', {}) + if os_family or package: + section = section.setdefault('associations', {}) + if os_family: + section = section.setdefault(os_family, {}) + if package: + section = section.setdefault(package, {}) + section['cache_versions'] = False + + +def get_package_name(os_family, package) -> str: + return packages.get_package_name(os_family, package) + + +def make_finalized_inventory(cluster: FakeKubernetesCluster): + return cluster.make_finalized_inventory() + + +def cache_installed_packages(cluster: FakeKubernetesCluster): + add_node.cache_installed_packages(cluster) + + +class AssociationsEnrichment(unittest.TestCase): + def test_simple_enrich_defaults(self): + inventory = demo.generate_inventory(**demo.MINIHA_KEEPALIVED) + cluster = new_debian_cluster(inventory) + associations = global_associations(cluster.inventory) + self.assertEqual(packages.get_associations_os_family_keys(), associations.keys(), + "Associations should have only OS family specific sections") + self.assertEqual(get_compiled_defaults(), associations, + "Enriched associations of the cluster does not equal to enriched defaults") + + def test_redefine_os_specific_section(self): + inventory = demo.generate_inventory(**demo.MINIHA_KEEPALIVED) + expected_pkgs = 'docker-ce' + package_associations(inventory, 'debian', 'docker')['package_name'] = expected_pkgs + cluster = new_debian_cluster(inventory) + associations = global_associations(cluster.inventory) + + defs = get_compiled_defaults() + self.assertNotEqual(expected_pkgs, defs['debian']['docker']['package_name']) + defs['debian']['docker']['package_name'] = expected_pkgs + self.assertEqual(defs, associations, + "Debian associations section was not enriched") + + def test_propagate_global_section_to_os_specific(self): + inventory = demo.generate_inventory(**demo.MINIHA_KEEPALIVED) + expected_pkgs_1 = 'docker-ce' + expected_pkgs_2 = ['podman', 'containerd=1.5.*'] + package_associations(inventory, None, 'docker')['package_name'] = expected_pkgs_1 + package_associations(inventory, None, 'containerd')['package_name'] = expected_pkgs_2 + cluster = new_debian_cluster(inventory) + associations = global_associations(cluster.inventory) + self.assertEqual(packages.get_associations_os_family_keys(), associations.keys(), + "Associations should have only OS family specific sections") + + defs = get_compiled_defaults() + self.assertNotEqual(expected_pkgs_1, defs['debian']['docker']['package_name']) + self.assertNotEqual(expected_pkgs_2, defs['debian']['containerd']['package_name']) + defs['debian']['docker']['package_name'] = expected_pkgs_1 + defs['debian']['containerd']['package_name'] = expected_pkgs_2 + self.assertEqual(defs, associations, + "Debian associations section was not enriched") + + def test_error_if_global_section_redefined_for_multiple_os(self): + inventory = demo.generate_inventory(**demo.MINIHA_KEEPALIVED) + expected_pkgs = 'docker-ce' + package_associations(inventory, None, 'docker')['package_name'] = expected_pkgs + context = demo.create_silent_context() + host_different_os = inventory['nodes'][0]['address'] + context['nodes'] = self._nodes_context_one_different_os(inventory, host_different_os) + with self.assertRaisesRegex(Exception, packages.ERROR_GLOBAL_ASSOCIATIONS_REDEFINED_MULTIPLE_OS): + demo.new_cluster(inventory, context=context) + + def test_error_if_global_section_redefined_for_add_node_different_os(self): + inventory = demo.generate_inventory(**demo.MINIHA_KEEPALIVED) + expected_pkgs = 'docker-ce' + package_associations(inventory, None, 'docker')['package_name'] = expected_pkgs + context = demo.create_silent_context(procedure='add_node') + host_different_os = inventory['nodes'][0]['address'] + context['nodes'] = self._nodes_context_one_different_os(inventory, host_different_os) + add_node = {'nodes': [inventory['nodes'].pop(0)]} + with self.assertRaisesRegex(Exception, packages.ERROR_GLOBAL_ASSOCIATIONS_REDEFINED_MULTIPLE_OS): + demo.new_cluster(inventory, procedure_inventory=add_node, context=context) + + def test_success_if_os_specific_section_redefined_for_add_node_different_os(self): + inventory = demo.generate_inventory(**demo.MINIHA_KEEPALIVED) + expected_pkgs = 'docker-ce' + package_associations(inventory, 'rhel', 'docker')['package_name'] = expected_pkgs + context = demo.create_silent_context(procedure='add_node') + host_different_os = inventory['nodes'][0]['address'] + context['nodes'] = self._nodes_context_one_different_os(inventory, host_different_os) + add_node = {'nodes': [inventory['nodes'].pop(0)]} + # no error + demo.new_cluster(inventory, procedure_inventory=add_node, context=context) + + def test_cache_versions_false_use_recommended_versions(self): + inventory = demo.generate_inventory(**demo.MINIHA_KEEPALIVED) + set_cache_versions_false(inventory, None, None) + cluster = new_debian_cluster(inventory) + associations = global_associations(cluster.inventory) + self.assertEqual(get_compiled_defaults(), associations, + "Even if cache_versions == false, we still need to use recommended versions") + + def test_remove_unused_os_family_associations(self): + inventory = demo.generate_inventory(**demo.MINIHA_KEEPALIVED) + cluster = new_debian_cluster(inventory) + finalized_inventory = packages.remove_unused_os_family_associations(cluster, cluster.inventory) + self.assertEqual({'debian'}, global_associations(finalized_inventory).keys()) + + def _nodes_context_one_different_os(self, inventory, host_different_os): + nodes_context = demo.generate_nodes_context(inventory, os_name='ubuntu', os_version='20.04') + nodes_context[host_different_os]['os'] = { + 'name': 'centos', + 'family': 'rhel', + 'version': '7.9' + } + return nodes_context + + +class PackagesUtilities(unittest.TestCase): + def test_get_package_name_rhel(self): + self.assertEqual('docker-ce', get_package_name('rhel', 'docker-ce-19.03.15-3.el7.x86_64')) + self.assertEqual('docker-ce', get_package_name('rhel', 'docker-ce-19.03*')) + self.assertEqual('docker-ce', get_package_name('rhel', 'docker-ce-*')) + self.assertEqual('docker-ce', get_package_name('rhel', 'docker-ce')) + + def test_get_package_name_debian(self): + self.assertEqual('containerd', get_package_name('debian', 'containerd=1.5.9-0ubuntu1~20.04.4')) + self.assertEqual('containerd', get_package_name('debian', 'containerd=1.5.*')) + self.assertEqual('containerd', get_package_name('debian', 'containerd=*')) + self.assertEqual('containerd', get_package_name('debian', 'containerd')) + + def test_detect_versions_debian(self): + inventory = demo.generate_inventory(**demo.MINIHA_KEEPALIVED) + context = demo.create_silent_context() + context['nodes'] = demo.generate_nodes_context(inventory, os_name='ubuntu', os_version='20.04') + cluster = demo.new_cluster(inventory, context=context) + + expected_pkg = 'containerd=1.5.9-0ubuntu1~20.04.4' + queried_pkg = 'containerd=1.5.*' + group = cluster.nodes['all'] + results = demo.create_nodegroup_result(group, stdout=expected_pkg) + cluster.fake_shell.add(results, 'sudo', [packages.get_detect_package_version_cmd('debian', 'containerd')]) + + detected_packages = packages.detect_installed_packages_version_groups(group, queried_pkg) + self.assertEqual({queried_pkg}, detected_packages.keys(), + "Incorrect initially queries package") + + package_versions = detected_packages[queried_pkg] + self.assertEqual({expected_pkg}, package_versions.keys(), + "Incorrect detected package versions") + self.assertEqual(set(group.get_hosts()), set(package_versions[expected_pkg]), + "Incorrect set of hosts with detected package version") + + def test_detect_versions_rhel(self): + inventory = demo.generate_inventory(**demo.MINIHA_KEEPALIVED) + context = demo.create_silent_context() + context['nodes'] = demo.generate_nodes_context(inventory, os_name='centos', os_version='7.9') + cluster = demo.new_cluster(inventory, context=context) + + expected_pkg = 'docker-ce-19.03.15-3.el7.x86_64' + queried_pkg = 'docker-ce-19.03*' + group = cluster.nodes['all'] + results = demo.create_nodegroup_result(group, stdout=expected_pkg) + cluster.fake_shell.add(results, 'sudo', [packages.get_detect_package_version_cmd('rhel', 'docker-ce')]) + + detected_packages = packages.detect_installed_packages_version_groups(group, [queried_pkg]) + self.assertEqual({queried_pkg}, detected_packages.keys(), + "Incorrect initially queries package") + + package_versions = detected_packages[queried_pkg] + self.assertEqual({expected_pkg}, package_versions.keys(), + "Incorrect detected package versions") + self.assertEqual(set(group.get_hosts()), set(package_versions[expected_pkg]), + "Incorrect set of hosts with detected package version") + + +class CacheVersions(unittest.TestCase): + def setUp(self) -> None: + self.fake_shell = demo.FakeShell() + self.inventory = demo.generate_inventory(**demo.MINIHA_KEEPALIVED) + self.context = demo.create_silent_context(procedure='add_node') + self.context['nodes'] = demo.generate_nodes_context(self.inventory, os_name='ubuntu', os_version='20.04') + self.hosts = [node['address'] for node in self.inventory['nodes']] + self.new_host = self.inventory['nodes'][0]['address'] + self.procedure_inventory = {'nodes': [self.inventory['nodes'].pop(0)]} + self.initial_hosts = [node['address'] for node in self.inventory['nodes']] + + def _new_cluster(self): + cluster = FakeKubernetesCluster(self.inventory, self.context, procedure_inventory=self.procedure_inventory, + fake_shell=self.fake_shell) + cluster.enrich() + return cluster + + def _stub_detect_package_result(self, package, hosts_stub: Dict[str, str]): + results = {} + for host in self.hosts: + if host in hosts_stub: + results[host] = demo.create_result(stdout=hosts_stub[host]) + else: + results[host] = demo.create_result(stdout='not installed') + + cmd = packages.get_detect_package_version_cmd('debian', package) + self.fake_shell.add(results, 'sudo', [cmd]) + + def _stub_associations_packages(self, packages_hosts_stub: Dict[str, Dict[str, str]]): + packages_list = [] + for association_params in get_compiled_defaults()['debian'].values(): + pkgs = association_params['package_name'] + if isinstance(pkgs, str): + pkgs = [pkgs] + + packages_list.extend(pkgs) + + packages_list = list(set(packages_list)) + for package in packages_list: + package = get_package_name('debian', package) + self._stub_detect_package_result(package, packages_hosts_stub.get(package, {})) + + def _packages_install(self, inventory: dict): + return inventory.setdefault('services', {}).setdefault('packages', {}).setdefault('install', []) + + def _packages_include(self, inventory: dict): + return inventory.setdefault('services', {}).setdefault('packages', {}).setdefault('install', {})\ + .setdefault('include', []) + + def _stub_custom_packages(self, packages_hosts_stub: Dict[str, Dict[str, str]]): + for package, hosts_stub in packages_hosts_stub.items(): + self._packages_install(self.inventory).append(package) + package = get_package_name('debian', package) + self._stub_detect_package_result(package, hosts_stub) + + def test_cache_versions_and_finalize_inventory(self): + self._stub_associations_packages({ + 'containerd': {host: 'containerd=1.5.9-0ubuntu1~20.04.4' for host in self.initial_hosts}, + 'auditd': {host: 'auditd=1:2.8.5-2ubuntu6' for host in self.initial_hosts}, + }) + self._stub_custom_packages({ + 'curl': {host: 'curl=7.68.0-1ubuntu2.14' for host in self.hosts}, + 'unzip': {host: 'unzip=6.0-25ubuntu1.1' for host in self.hosts}, + }) + + cluster = self._new_cluster() + cache_installed_packages(cluster) + + self.assertEqual('containerd=1.5.9-0ubuntu1~20.04.4', + package_associations(cluster.inventory, 'debian', 'containerd')['package_name'][0], + "containerd was not detected") + self.assertEqual('auditd=1:2.8.5-2ubuntu6', + package_associations(cluster.inventory, 'debian', 'audit')['package_name'], + "auditd was not detected") + self.assertEqual({'curl', 'unzip'}, set(self._packages_include(cluster.inventory)), + "Custom packages versions should be not detected when adding node") + + finalized_inventory = make_finalized_inventory(cluster) + self.assertEqual('containerd=1.5.9-0ubuntu1~20.04.4', + package_associations(finalized_inventory, 'debian', 'containerd')['package_name'][0], + "containerd was not detected") + self.assertEqual('auditd=1:2.8.5-2ubuntu6', + package_associations(finalized_inventory, 'debian', 'audit')['package_name'], + "auditd was not detected") + self.assertEqual({'curl=7.68.0-1ubuntu2.14', 'unzip=6.0-25ubuntu1.1'}, set(self._packages_include(finalized_inventory)), + "Custom packages versions should be detected in finalized inventory") + + def test_cache_versions_global_off(self): + expected_containerd = 'containerd=1.5.9-0ubuntu1~20.04.4' + default_containerd = get_compiled_defaults()['debian']['containerd']['package_name'][0] + self.assertNotEqual(expected_containerd, default_containerd) + + self._stub_associations_packages({ + 'containerd': {host: expected_containerd for host in self.initial_hosts}, + }) + + set_cache_versions_false(self.inventory, None, None) + cluster = self._new_cluster() + cache_installed_packages(cluster) + + self.assertEqual(default_containerd, + package_associations(cluster.inventory, 'debian', 'containerd')['package_name'][0], + "containerd should be default because caching versions is off") + + finalized_inventory = make_finalized_inventory(cluster) + self.assertEqual(expected_containerd, + package_associations(finalized_inventory, 'debian', 'containerd')['package_name'][0], + "containerd was not detected") + + def test_cache_versions_specific_off(self): + default_containerd = get_compiled_defaults()['debian']['containerd']['package_name'][0] + default_haproxy = get_compiled_defaults()['debian']['haproxy']['package_name'] + + self._stub_associations_packages({ + 'containerd': {host: 'containerd=1.5.9-0ubuntu1~20.04.4' for host in self.initial_hosts}, + 'auditd': {host: 'auditd=1:2.8.5-2ubuntu6' for host in self.initial_hosts}, + 'haproxy': {host: 'haproxy=2.0.29-0ubuntu1' for host in self.initial_hosts}, + }) + + set_cache_versions_false(self.inventory, None, 'containerd') + set_cache_versions_false(self.inventory, 'debian', 'haproxy') + cluster = self._new_cluster() + cache_installed_packages(cluster) + + self.assertEqual(default_containerd, + package_associations(cluster.inventory, 'debian', 'containerd')['package_name'][0], + "containerd should be default because caching versions is off") + self.assertEqual('auditd=1:2.8.5-2ubuntu6', + package_associations(cluster.inventory, 'debian', 'audit')['package_name'], + "auditd was not detected") + self.assertEqual(default_haproxy, + package_associations(cluster.inventory, 'debian', 'haproxy')['package_name'], + "haproxy should be default because caching versions is off") + + finalized_inventory = make_finalized_inventory(cluster) + self.assertEqual('containerd=1.5.9-0ubuntu1~20.04.4', + package_associations(finalized_inventory, 'debian', 'containerd')['package_name'][0], + "containerd was not detected") + self.assertEqual('auditd=1:2.8.5-2ubuntu6', + package_associations(finalized_inventory, 'debian', 'audit')['package_name'], + "auditd was not detected") + self.assertEqual('haproxy=2.0.29-0ubuntu1', + package_associations(finalized_inventory, 'debian', 'haproxy')['package_name'], + "haproxy was not detected") + + def test_add_node_fails_different_package_versions(self): + self._stub_associations_packages({ + 'containerd': { + self.initial_hosts[0]: 'containerd=1.5.9-0ubuntu1~20.04.4', + self.initial_hosts[1]: 'containerd=2', + }, + }) + + cluster = self._new_cluster() + expected_error_regex = packages.ERROR_MULTIPLE_PACKAGE_VERSIONS_DETECTED.replace('%s', '.*') + with self.assertRaisesRegex(Exception, expected_error_regex): + cache_installed_packages(cluster) + + def test_finalize_inventory_different_package_versions(self): + default_containerd = get_compiled_defaults()['debian']['containerd']['package_name'][0] + + self._stub_associations_packages({ + 'containerd': { + self.initial_hosts[0]: 'containerd=1.5.9-0ubuntu1~20.04.4', + self.initial_hosts[1]: 'containerd=2', + }, + 'auditd': {host: 'auditd=1:2.8.5-2ubuntu6' for host in self.initial_hosts}, + }) + self._stub_custom_packages({ + 'curl=7.*': { + self.initial_hosts[0]: 'curl=7.68.0-1ubuntu2.14', + self.initial_hosts[1]: 'curl=2', + }, + 'unzip=6.*': {host: 'unzip=6.0-25ubuntu1.1' for host in self.hosts}, + }) + + cluster = self._new_cluster() + + finalized_inventory = make_finalized_inventory(cluster) + self.assertEqual(default_containerd, + package_associations(finalized_inventory, 'debian', 'containerd')['package_name'][0], + "containerd should be default because multiple versions are installed") + self.assertEqual('auditd=1:2.8.5-2ubuntu6', + package_associations(finalized_inventory, 'debian', 'audit')['package_name'], + "auditd was not detected") + self.assertEqual({'curl=7.*', 'unzip=6.0-25ubuntu1.1'}, set(self._packages_include(finalized_inventory)), + "Custom packages versions should be partially detected in finalized inventory") + + def test_not_cache_versions_if_multiple_os_family_versions(self): + default_containerd = get_compiled_defaults()['debian']['containerd']['package_name'][0] + + self._stub_associations_packages({ + 'containerd': {host: 'containerd=1.5.9-0ubuntu1~20.04.4' for host in self.initial_hosts}, + }) + self._stub_custom_packages({ + 'curl=7.*': {host: 'curl=7.68.0-1ubuntu2.14' for host in self.hosts}, + }) + + self.context['nodes'][self.new_host]['os']['version'] = '22.04' + cluster = self._new_cluster() + cache_installed_packages(cluster) + + self.assertEqual(default_containerd, + package_associations(cluster.inventory, 'debian', 'containerd')['package_name'][0], + "containerd should be default because multiple OS versions are detected") + + finalized_inventory = make_finalized_inventory(cluster) + self.assertEqual(default_containerd, + package_associations(finalized_inventory, 'debian', 'containerd')['package_name'][0], + "containerd should be default because multiple OS versions are detected") + self.assertEqual({'curl=7.*'}, set(self._packages_include(finalized_inventory)), + "Custom packages should be default because multiple OS versions are detected") diff --git a/test/unit/test_upgrade.py b/test/unit/test_upgrade.py index 2ae322cb3..7822fa642 100755 --- a/test/unit/test_upgrade.py +++ b/test/unit/test_upgrade.py @@ -15,8 +15,10 @@ import unittest +from copy import deepcopy from kubemarine import kubernetes +from kubemarine.core import utils from kubemarine.procedures import upgrade from kubemarine import demo @@ -83,45 +85,88 @@ def test_upgrade_plan_sort(self): ], result) +def generate_upgrade_environment(old, new) -> (dict, dict): + inventory = demo.generate_inventory(**demo.MINIHA_KEEPALIVED) + inventory['services']['kubeadm'] = { + 'kubernetesVersion': old + } + context = demo.create_silent_context(procedure='upgrade') + context['upgrade_version'] = new + return inventory, context + + class UpgradeDefaultsEnrichment(unittest.TestCase): def prepare_cluster(self, old, new): - inventory = demo.generate_inventory(**demo.MINIHA_KEEPALIVED) - inventory['services']['kubeadm'] = { - 'kubernetesVersion': old - } - cluster = demo.new_cluster(inventory) - cluster.context['upgrade_version'] = new - cluster.context['initial_procedure'] = 'upgrade' + inventory, context = generate_upgrade_environment(old, new) + upgrade = {'upgrade_plan': [new]} + cluster = demo.new_cluster(inventory, procedure_inventory=upgrade, context=context) return cluster def test_correct_inventory(self): - old_kubernetes_version = 'v1.22.2' - new_kubernetes_version = 'v1.22.10' + old_kubernetes_version = 'v1.24.0' + new_kubernetes_version = 'v1.24.2' cluster = self.prepare_cluster(old_kubernetes_version, new_kubernetes_version) - cluster._inventory = kubernetes.enrich_upgrade_inventory(cluster.inventory, cluster) self.assertEqual(new_kubernetes_version, cluster.inventory['services']['kubeadm']['kubernetesVersion']) def test_incorrect_inventory_high_range(self): - old_kubernetes_version = 'v1.22.2' - new_kubernetes_version = 'v1.28.2' - cluster = self.prepare_cluster(old_kubernetes_version, new_kubernetes_version) - with self.assertRaises(Exception): - kubernetes.enrich_upgrade_inventory(cluster.inventory, cluster) + old_kubernetes_version = 'v1.22.9' + new_kubernetes_version = 'v1.24.2' + with self.assertRaisesRegex(Exception, kubernetes.ERROR_MINOR_RANGE_EXCEEDED + % (old_kubernetes_version, new_kubernetes_version)): + self.prepare_cluster(old_kubernetes_version, new_kubernetes_version) def test_incorrect_inventory_downgrade(self): - old_kubernetes_version = 'v1.22.2' - new_kubernetes_version = 'v1.18.4' - cluster = self.prepare_cluster(old_kubernetes_version, new_kubernetes_version) - with self.assertRaises(Exception): - kubernetes.enrich_upgrade_inventory(cluster.inventory, cluster) + old_kubernetes_version = 'v1.24.2' + new_kubernetes_version = 'v1.22.9' + with self.assertRaisesRegex(Exception, kubernetes.ERROR_DOWNGRADE + % (old_kubernetes_version, new_kubernetes_version)): + self.prepare_cluster(old_kubernetes_version, new_kubernetes_version) def test_incorrect_inventory_same_version(self): - old_kubernetes_version = 'v1.22.2' - new_kubernetes_version = 'v1.22.2' - cluster = self.prepare_cluster(old_kubernetes_version, new_kubernetes_version) - with self.assertRaises(Exception): - kubernetes.enrich_upgrade_inventory(cluster.inventory, cluster) + old_kubernetes_version = 'v1.24.2' + new_kubernetes_version = 'v1.24.2' + with self.assertRaisesRegex(Exception, kubernetes.ERROR_SAME + % (old_kubernetes_version, new_kubernetes_version)): + self.prepare_cluster(old_kubernetes_version, new_kubernetes_version) + + +class UpgradePackagesEnrichment(unittest.TestCase): + def prepare_procedure_inventory(self, new): + return { + 'upgrade_plan': [new], + new: { + 'packages': { + 'associations': { + 'docker': { + 'package_name': 'docker-ce' + } + }, + 'install': ['curl'] + } + } + } + + def test_enrich_packages_propagate_associations(self): + old = 'v1.24.0' + new = 'v1.24.2' + inventory, context = generate_upgrade_environment(old, new) + upgrade = self.prepare_procedure_inventory(new) + cluster = demo.new_cluster(inventory, procedure_inventory=upgrade, context=context) + self.assertEqual(['curl'], cluster.inventory['services']['packages']['install']['include'], + "Custom packages are enriched incorrectly") + self.assertEqual('docker-ce', cluster.inventory['services']['packages']['associations']['rhel']['docker']['package_name'], + "Associations packages are enriched incorrectly") + + def test_final_inventory_enrich_global(self): + old = 'v1.24.0' + new = 'v1.24.2' + inventory, context = generate_upgrade_environment(old, new) + upgrade = self.prepare_procedure_inventory(new) + cluster = demo.new_cluster(deepcopy(inventory), procedure_inventory=deepcopy(upgrade), context=context) + final_inventory = utils.get_final_inventory(cluster, inventory) + self.assertEqual(upgrade[new]['packages'], final_inventory['services']['packages'], + "Final inventory is recreated incorrectly") if __name__ == '__main__':