From dc62fdc94bec58d72637361de630d0d420dada40 Mon Sep 17 00:00:00 2001 From: ilia1243 <8808144+ilia1243@users.noreply.github.com> Date: Wed, 9 Nov 2022 21:09:55 +0300 Subject: [PATCH] Implemented packages caching in finalized inventory --- kubemarine/core/cluster.py | 89 +------------- kubemarine/packages.py | 187 ++++++++++++++++++++++-------- kubemarine/procedures/add_node.py | 7 +- 3 files changed, 144 insertions(+), 139 deletions(-) diff --git a/kubemarine/core/cluster.py b/kubemarine/core/cluster.py index 82cad78c9..e8d1b52d0 100755 --- a/kubemarine/core/cluster.py +++ b/kubemarine/core/cluster.py @@ -327,100 +327,13 @@ def get_package_association_str_for_group(self, group: NodeGroup, return results_values[0] raise Exception(f'Too many values returned for package associations str "{association_key}" for package "{package}"') - def cache_package_versions(self): - # todo not cache twice for add_node procedure - # Cache packages only if it's set in configuration - if not self.inventory['services']['packages']['cache_versions']: - self.log.debug("Skip caching of package versions as it is manually disabled") - return - - os_ids = self.get_os_identifiers() - different_os = list(set(os_ids.values())) - if len(different_os) > 1: - self.log.debug(f"Final nodes have different OS families or versions, packages will not be cached. " - f"List of (OS family, version): {different_os}") - return - - os_family = different_os[0][0] - if os_family in ('unknown', 'unsupported'): - # For add_node/install procedures we check that OS is supported in prepare.check.system task. - # For check_iaas procedure it is allowed to have unsupported OS, so skip caching. - self.log.debug("Skip caching of packages for unsupported OS.") - return - - nodes_cache_versions = self.nodes['all'].get_final_nodes().get_sudo_nodes() - if nodes_cache_versions.is_empty(): - # For add_node/install procedures we check that all nodes are sudoers in prepare.check.sudoer task. - # For check_iaas procedure the nodes might still be not sudoers, so skip caching. - self.log.debug(f"There are no nodes with sudo privileges, packages will not be cached.") - return - - self._detect_and_cache_package_versions_by_group(nodes_cache_versions) - self.log.debug('Package versions detection finished') - - def _detect_and_cache_package_versions_by_group(self, group: NodeGroup): - from kubemarine import packages - detected_packages = packages.detect_installed_packages_version_groups(group) - - for association_name, associated_params in self.inventory['services']['packages']['associations'].items(): - indexed_by_pure_packages = packages.get_indexed_by_pure_packages_for_association(group, association_name) - if not indexed_by_pure_packages: - continue - - final_packages_list = [] - - for package in indexed_by_pure_packages.keys(): - detected_package_versions = list(detected_packages[package].keys()) - installed_packages = [] - for version in detected_package_versions: - # add package version to list only if it was found as installed - if "not installed" not in version: - installed_packages.append(version) - - # if there no versions detected, then set package version to default - if not installed_packages: - final_packages_list.append(indexed_by_pure_packages[package]) - else: - # todo what if installed with different versions? - final_packages_list.extend(installed_packages) - - # if non-multiple value, then convert to simple string - # packages can contain multiple package values, like docker package - # (it has docker-ce, docker-cli and containerd.io packages for installation) - if len(final_packages_list) == 1: - final_packages_list = final_packages_list[0] - else: - final_packages_list = list(set(final_packages_list)) - - associated_params['package_name'] = final_packages_list - # packages from direct installation section - if self.inventory['services']['packages'].get('install', {}): - final_packages_list = [] - for package in self.inventory['services']['packages']['install']['include']: - package_versions_list = [] - if package in self.globals['compatibility_map']['software']: - detected_package_versions = list(detected_packages[package].keys()) - for version in detected_package_versions: - # skip version, which ended with special symbol = or - - # (it is possible in some cases) - if "not installed" not in version and version[-1] != '=' and version[-1] != '-': - # add package version to list only if it was found as installed - package_versions_list.append(version) - # if there no versions detected, then set package version to default - if not package_versions_list: - package_versions_list = [package] - final_packages_list = final_packages_list + package_versions_list - self.inventory['services']['packages']['install']['include'] = list(set(final_packages_list)) - return detected_packages - def dump_finalized_inventory(self): - self.cache_package_versions() - from kubemarine.core import defaults from kubemarine.procedures import remove_node from kubemarine import controlplane, cri, packages cluster_finalized_functions = { + packages.cache_package_versions, packages.remove_unused_os_family_associations, cri.remove_invalid_cri_config, remove_node.remove_node_finalize_inventory, diff --git a/kubemarine/packages.py b/kubemarine/packages.py index 5bd194ca6..5458adf1b 100644 --- a/kubemarine/packages.py +++ b/kubemarine/packages.py @@ -47,6 +47,130 @@ def enrich_inventory_associations(inventory, cluster: KubernetesCluster): return inventory +def cache_package_versions(cluster: KubernetesCluster, inventory: dict, ensured_associations_only=False) -> dict: + os_ids = cluster.get_os_identifiers() + different_os = list(set(os_ids.values())) + if len(different_os) > 1: + cluster.log.debug(f"Final nodes have different OS families or versions, packages will not be cached. " + f"List of (OS family, version): {different_os}") + return inventory + + os_family = different_os[0][0] + if os_family in ('unknown', 'unsupported'): + # For add_node/install procedures we check that OS is supported in prepare.check.system task. + # For check_iaas procedure it is allowed to have unsupported OS, so skip caching. + cluster.log.debug("Skip caching of packages for unsupported OS.") + return inventory + + nodes_cache_versions = cluster.nodes['all'].get_final_nodes().get_sudo_nodes() + if nodes_cache_versions.is_empty(): + # For add_node/install procedures we check that all nodes are sudoers in prepare.check.sudoer task. + # For check_iaas procedure the nodes might still be not sudoers, so skip caching. + cluster.log.debug(f"There are no nodes with sudo privileges, packages will not be cached.") + return inventory + + associations_packages = _get_associations_packages_to_cache(inventory, nodes_cache_versions, ensured_associations_only) + packages_list = _get_packages_to_detect_versions(inventory, associations_packages, ensured_associations_only) + + detected_packages = detect_installed_packages_version_groups(nodes_cache_versions, packages_list) + + _cache_package_associations(cluster, inventory, associations_packages, detected_packages, ensured_associations_only) + _cache_custom_packages(cluster, inventory, detected_packages, ensured_associations_only) + + cluster.log.debug('Package versions detection finished') + return inventory + + +def _get_associations_packages_to_cache(inventory: dict, group: NodeGroup, ensured_association_only: bool) \ + -> Dict[str, Dict[str, str]]: + + associations_packages: Dict[str, Dict[str, str]] = {} + global_cache_versions = inventory['services']['packages']['cache_versions'] + for association_name, associated_params in inventory['services']['packages']['associations'].items(): + pkgs = get_indexed_by_pure_packages_for_association(group, association_name) + if not pkgs: + continue + if not ensured_association_only or (global_cache_versions and associated_params.get('cache_versions', True)): + associations_packages[association_name] = pkgs + + return associations_packages + + +def _get_packages_to_detect_versions(inventory: dict, associations_packages: Dict[str, Dict[str, str]], + ensured_association_only: bool) -> list: + packages_list = [] + for pure_packages in associations_packages.values(): + packages_list.extend(pure_packages.keys()) + + if not ensured_association_only and inventory['services']['packages'].get('install', {}): + packages_list.extend(inventory['services']['packages']['install']['include']) + + return packages_list + + +def _cache_package_associations(cluster: KubernetesCluster, inventory: dict, associations_packages: Dict[str, Dict[str, str]], + detected_packages: Dict[str, Dict[str, List]], ensured_association_only: bool): + for association_name, associated_params in inventory['services']['packages']['associations'].items(): + if association_name not in associations_packages: + continue + + pure_packages = associations_packages[association_name] + final_packages_list = [] + + for package in pure_packages.keys(): + final_package = _detect_final_package(cluster, + detected_packages, package, pure_packages[package], ensured_association_only) + final_packages_list.append(final_package) + + # if non-multiple value, then convert to simple string + # packages can contain multiple package values, like docker package + # (it has docker-ce, docker-cli and containerd.io packages for installation) + if len(final_packages_list) == 1: + final_packages_list = final_packages_list[0] + + associated_params['package_name'] = final_packages_list + + +def _cache_custom_packages(cluster: KubernetesCluster, inventory: dict, + detected_packages: Dict[str, Dict[str, List]], ensured_association_only: bool): + if ensured_association_only: + return + # packages from direct installation section + custom_install_packages = inventory['services']['packages'].get('install', {}) + if custom_install_packages: + final_packages_list = [] + for package in custom_install_packages['include']: + final_package = _detect_final_package(cluster, detected_packages, package, package, False) + final_packages_list.append(final_package) + custom_install_packages['include'] = final_packages_list + return detected_packages + + +def _detect_final_package(cluster: KubernetesCluster, detected_packages: Dict[str, Dict[str, List]], + package, default_package, ensured_association_only: bool): + # add package version to list only if it was found as installed + detected_package_versions = list(filter(lambda version: "not installed" not in version, + detected_packages[package].keys())) + + # if there no versions detected, then return default package from inventory + if not detected_package_versions: + return default_package + elif len(detected_package_versions) > 1: + if ensured_association_only: + raise Exception( + f"Multiple package versions detected {detected_packages[package]} for package '{package}'. " + f"Align them to the single version manually or using corresponding task of install procedure. " + f"Alternatively, specify cache_versions=false in corresponding association.") + else: + cluster.log.warning( + f"Multiple package versions detected {detected_packages[package]} for package '{package}'. " + f"Use default package '{default_package}' from inventory.") + # return default package from inventory if multiple versions detected + return default_package + else: + return detected_package_versions[0] + + def remove_unused_os_family_associations(cluster: KubernetesCluster, inventory: dict): final_nodes = cluster.nodes['all'].get_final_nodes() for os_family in get_associations_os_family_keys(): @@ -153,78 +277,45 @@ def detect_installed_package_version(group: NodeGroup, package: str, warn=True) return group.sudo(cmd) -def detect_installed_packages_versions(group: NodeGroup, packages_list: List or str = None) -> Dict[str, NodeGroupResult]: +def detect_installed_packages_version_groups(group: NodeGroup, packages_list: List or str) -> Dict[str, Dict[str, List]]: """ - Detect packages versions for each host on remote group from specified list of packages + Detect grouped packages versions on remote group from specified list of packages. :param group: Group of nodes, where packages should be found - :param packages_list: Single package or list of packages, which versions should be detected. If packages list empty, - then packages will be automatically added from services.packages.associations and services.packages.install.include - :return: Dictionary with NodeGroupResults for each queried package, e.g. "foo" -> {1.1.1.1:"foo-1", 1.1.1.2:"foo-2"} + :param packages_list: Single package or list of packages, which versions should be detected. + :return: Dictionary with grouped versions for each queried package, pointing to list of hosts, + e.g. {"foo" -> {"foo-1": [host1, host2]}, "bar" -> {"bar-1": [host1], "bar-2": [host2]}} """ cluster = group.cluster - # todo skip detection of cache_versions=false packages from outside - excluded_dict = {} - - if packages_list is None: - packages_list = [] - # packages from associations - for association_name, associated_params in cluster.inventory['services']['packages']['associations'].items(): - indexed_by_pure_packages = get_indexed_by_pure_packages_for_association(group, association_name) - if not indexed_by_pure_packages: - continue - packages_list.extend(indexed_by_pure_packages.keys()) - if not associated_params.get('cache_versions', True): - # replace packages with associated version that should be excluded from cache - excluded_dict.update(indexed_by_pure_packages) + if isinstance(packages_list, str): + packages_list = [packages_list] # deduplicate packages_list = list(set(packages_list)) + if not packages_list: + return {} with RemoteExecutor(cluster) as exe: for package in packages_list: detect_installed_package_version(group, package, warn=True) raw_result = exe.get_last_results() - results: dict[str, NodeGroupResult] = {} + results: Dict[str, Dict[str, List]] = {} for i, package in enumerate(packages_list): - results[package] = NodeGroupResult(cluster) - for host, multiple_results in raw_result.items(): + detected_grouped_packages = {} + for conn, multiple_results in raw_result.items(): node_detected_package = multiple_results[i].stdout.strip() + multiple_results[i].stderr.strip() # consider version, which ended with special symbol = or - as not installed # (it is possible in some cases to receive "containerd=" version) if "not installed" in node_detected_package or "no packages found" in node_detected_package \ or node_detected_package[-1] == '=' or node_detected_package[-1] == '-': node_detected_package = f"not installed {package}" - elif package in excluded_dict.keys(): - node_detected_package = excluded_dict[package] - results[package][host] = node_detected_package + detected_grouped_packages.setdefault(node_detected_package, []).append(conn.host) - return results + results[package] = detected_grouped_packages - -def detect_installed_packages_version_groups(group: NodeGroup, packages_list: List or str = None) \ - -> Dict[str, Dict[str, List]]: - """ - Detect grouped packages versions on remote group from specified list of packages. - :param group: Group of nodes, where packages should be found - :param packages_list: Single package or list of packages, which versions should be detected. If packages list empty, - then packages will be automatically added from services.packages.associations and services.packages.install.include - :return: Dictionary with grouped versions for each queried package, pointing to list of hosts, - e.g. {"foo" -> {"foo-1": [host1, host2]}, "bar" -> {"bar-1": [host1], "bar-2": [host2]}} - """ - - detected_packages = detect_installed_packages_versions(group, packages_list) - grouped_packages: Dict[str, Dict[str, List]] = {} - for queried_package, detected_packages_results in detected_packages.items(): - detected_grouped_packages = {} - for host, pckg in detected_packages_results.items(): - detected_grouped_packages.setdefault(pckg, []).append(host) - - grouped_packages[queried_package] = detected_grouped_packages - - return grouped_packages + return results def get_package_name(os_family: str, package: str) -> str: diff --git a/kubemarine/procedures/add_node.py b/kubemarine/procedures/add_node.py index a819c0ff7..a257a77f6 100755 --- a/kubemarine/procedures/add_node.py +++ b/kubemarine/procedures/add_node.py @@ -17,9 +17,10 @@ import copy from collections import OrderedDict -from kubemarine import kubernetes, system +from kubemarine import kubernetes, packages from kubemarine.core import flow, utils from kubemarine.core.action import Action +from kubemarine.core.cluster import KubernetesCluster from kubemarine.core.resources import DynamicResources from kubemarine.procedures import install @@ -92,13 +93,13 @@ def add_node_finalize_inventory(cluster, inventory_to_finalize): return inventory_to_finalize -def cache_installed_packages(cluster): +def cache_installed_packages(cluster: KubernetesCluster): """ Task which is used to collect already installed packages versions on already existing nodes. It is called first during "add_node" procedure, so that new nodes install exactly the same packages as on other already existing nodes. """ - cluster.cache_package_versions() + packages.cache_package_versions(cluster, cluster.inventory, ensured_associations_only=True) tasks = OrderedDict(copy.deepcopy(install.tasks))