Skip to content

Commit

Permalink
Implemented packages caching in finalized inventory
Browse files Browse the repository at this point in the history
  • Loading branch information
ilia1243 committed Nov 9, 2022
1 parent b3f470e commit dc62fdc
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 139 deletions.
89 changes: 1 addition & 88 deletions kubemarine/core/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,100 +327,13 @@ def get_package_association_str_for_group(self, group: NodeGroup,
return results_values[0]
raise Exception(f'Too many values returned for package associations str "{association_key}" for package "{package}"')

def cache_package_versions(self):
# todo not cache twice for add_node procedure
# Cache packages only if it's set in configuration
if not self.inventory['services']['packages']['cache_versions']:
self.log.debug("Skip caching of package versions as it is manually disabled")
return

os_ids = self.get_os_identifiers()
different_os = list(set(os_ids.values()))
if len(different_os) > 1:
self.log.debug(f"Final nodes have different OS families or versions, packages will not be cached. "
f"List of (OS family, version): {different_os}")
return

os_family = different_os[0][0]
if os_family in ('unknown', 'unsupported'):
# For add_node/install procedures we check that OS is supported in prepare.check.system task.
# For check_iaas procedure it is allowed to have unsupported OS, so skip caching.
self.log.debug("Skip caching of packages for unsupported OS.")
return

nodes_cache_versions = self.nodes['all'].get_final_nodes().get_sudo_nodes()
if nodes_cache_versions.is_empty():
# For add_node/install procedures we check that all nodes are sudoers in prepare.check.sudoer task.
# For check_iaas procedure the nodes might still be not sudoers, so skip caching.
self.log.debug(f"There are no nodes with sudo privileges, packages will not be cached.")
return

self._detect_and_cache_package_versions_by_group(nodes_cache_versions)
self.log.debug('Package versions detection finished')

def _detect_and_cache_package_versions_by_group(self, group: NodeGroup):
from kubemarine import packages
detected_packages = packages.detect_installed_packages_version_groups(group)

for association_name, associated_params in self.inventory['services']['packages']['associations'].items():
indexed_by_pure_packages = packages.get_indexed_by_pure_packages_for_association(group, association_name)
if not indexed_by_pure_packages:
continue

final_packages_list = []

for package in indexed_by_pure_packages.keys():
detected_package_versions = list(detected_packages[package].keys())
installed_packages = []
for version in detected_package_versions:
# add package version to list only if it was found as installed
if "not installed" not in version:
installed_packages.append(version)

# if there no versions detected, then set package version to default
if not installed_packages:
final_packages_list.append(indexed_by_pure_packages[package])
else:
# todo what if installed with different versions?
final_packages_list.extend(installed_packages)

# if non-multiple value, then convert to simple string
# packages can contain multiple package values, like docker package
# (it has docker-ce, docker-cli and containerd.io packages for installation)
if len(final_packages_list) == 1:
final_packages_list = final_packages_list[0]
else:
final_packages_list = list(set(final_packages_list))

associated_params['package_name'] = final_packages_list
# packages from direct installation section
if self.inventory['services']['packages'].get('install', {}):
final_packages_list = []
for package in self.inventory['services']['packages']['install']['include']:
package_versions_list = []
if package in self.globals['compatibility_map']['software']:
detected_package_versions = list(detected_packages[package].keys())
for version in detected_package_versions:
# skip version, which ended with special symbol = or -
# (it is possible in some cases)
if "not installed" not in version and version[-1] != '=' and version[-1] != '-':
# add package version to list only if it was found as installed
package_versions_list.append(version)
# if there no versions detected, then set package version to default
if not package_versions_list:
package_versions_list = [package]
final_packages_list = final_packages_list + package_versions_list
self.inventory['services']['packages']['install']['include'] = list(set(final_packages_list))
return detected_packages

def dump_finalized_inventory(self):
self.cache_package_versions()

from kubemarine.core import defaults
from kubemarine.procedures import remove_node
from kubemarine import controlplane, cri, packages

cluster_finalized_functions = {
packages.cache_package_versions,
packages.remove_unused_os_family_associations,
cri.remove_invalid_cri_config,
remove_node.remove_node_finalize_inventory,
Expand Down
187 changes: 139 additions & 48 deletions kubemarine/packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,130 @@ def enrich_inventory_associations(inventory, cluster: KubernetesCluster):
return inventory


def cache_package_versions(cluster: KubernetesCluster, inventory: dict, ensured_associations_only=False) -> dict:
os_ids = cluster.get_os_identifiers()
different_os = list(set(os_ids.values()))
if len(different_os) > 1:
cluster.log.debug(f"Final nodes have different OS families or versions, packages will not be cached. "
f"List of (OS family, version): {different_os}")
return inventory

os_family = different_os[0][0]
if os_family in ('unknown', 'unsupported'):
# For add_node/install procedures we check that OS is supported in prepare.check.system task.
# For check_iaas procedure it is allowed to have unsupported OS, so skip caching.
cluster.log.debug("Skip caching of packages for unsupported OS.")
return inventory

nodes_cache_versions = cluster.nodes['all'].get_final_nodes().get_sudo_nodes()
if nodes_cache_versions.is_empty():
# For add_node/install procedures we check that all nodes are sudoers in prepare.check.sudoer task.
# For check_iaas procedure the nodes might still be not sudoers, so skip caching.
cluster.log.debug(f"There are no nodes with sudo privileges, packages will not be cached.")
return inventory

associations_packages = _get_associations_packages_to_cache(inventory, nodes_cache_versions, ensured_associations_only)
packages_list = _get_packages_to_detect_versions(inventory, associations_packages, ensured_associations_only)

detected_packages = detect_installed_packages_version_groups(nodes_cache_versions, packages_list)

_cache_package_associations(cluster, inventory, associations_packages, detected_packages, ensured_associations_only)
_cache_custom_packages(cluster, inventory, detected_packages, ensured_associations_only)

cluster.log.debug('Package versions detection finished')
return inventory


def _get_associations_packages_to_cache(inventory: dict, group: NodeGroup, ensured_association_only: bool) \
-> Dict[str, Dict[str, str]]:

associations_packages: Dict[str, Dict[str, str]] = {}
global_cache_versions = inventory['services']['packages']['cache_versions']
for association_name, associated_params in inventory['services']['packages']['associations'].items():
pkgs = get_indexed_by_pure_packages_for_association(group, association_name)
if not pkgs:
continue
if not ensured_association_only or (global_cache_versions and associated_params.get('cache_versions', True)):
associations_packages[association_name] = pkgs

return associations_packages


def _get_packages_to_detect_versions(inventory: dict, associations_packages: Dict[str, Dict[str, str]],
ensured_association_only: bool) -> list:
packages_list = []
for pure_packages in associations_packages.values():
packages_list.extend(pure_packages.keys())

if not ensured_association_only and inventory['services']['packages'].get('install', {}):
packages_list.extend(inventory['services']['packages']['install']['include'])

return packages_list


def _cache_package_associations(cluster: KubernetesCluster, inventory: dict, associations_packages: Dict[str, Dict[str, str]],
detected_packages: Dict[str, Dict[str, List]], ensured_association_only: bool):
for association_name, associated_params in inventory['services']['packages']['associations'].items():
if association_name not in associations_packages:
continue

pure_packages = associations_packages[association_name]
final_packages_list = []

for package in pure_packages.keys():
final_package = _detect_final_package(cluster,
detected_packages, package, pure_packages[package], ensured_association_only)
final_packages_list.append(final_package)

# if non-multiple value, then convert to simple string
# packages can contain multiple package values, like docker package
# (it has docker-ce, docker-cli and containerd.io packages for installation)
if len(final_packages_list) == 1:
final_packages_list = final_packages_list[0]

associated_params['package_name'] = final_packages_list


def _cache_custom_packages(cluster: KubernetesCluster, inventory: dict,
detected_packages: Dict[str, Dict[str, List]], ensured_association_only: bool):
if ensured_association_only:
return
# packages from direct installation section
custom_install_packages = inventory['services']['packages'].get('install', {})
if custom_install_packages:
final_packages_list = []
for package in custom_install_packages['include']:
final_package = _detect_final_package(cluster, detected_packages, package, package, False)
final_packages_list.append(final_package)
custom_install_packages['include'] = final_packages_list
return detected_packages


def _detect_final_package(cluster: KubernetesCluster, detected_packages: Dict[str, Dict[str, List]],
package, default_package, ensured_association_only: bool):
# add package version to list only if it was found as installed
detected_package_versions = list(filter(lambda version: "not installed" not in version,
detected_packages[package].keys()))

# if there no versions detected, then return default package from inventory
if not detected_package_versions:
return default_package
elif len(detected_package_versions) > 1:
if ensured_association_only:
raise Exception(
f"Multiple package versions detected {detected_packages[package]} for package '{package}'. "
f"Align them to the single version manually or using corresponding task of install procedure. "
f"Alternatively, specify cache_versions=false in corresponding association.")
else:
cluster.log.warning(
f"Multiple package versions detected {detected_packages[package]} for package '{package}'. "
f"Use default package '{default_package}' from inventory.")
# return default package from inventory if multiple versions detected
return default_package
else:
return detected_package_versions[0]


def remove_unused_os_family_associations(cluster: KubernetesCluster, inventory: dict):
final_nodes = cluster.nodes['all'].get_final_nodes()
for os_family in get_associations_os_family_keys():
Expand Down Expand Up @@ -153,78 +277,45 @@ def detect_installed_package_version(group: NodeGroup, package: str, warn=True)
return group.sudo(cmd)


def detect_installed_packages_versions(group: NodeGroup, packages_list: List or str = None) -> Dict[str, NodeGroupResult]:
def detect_installed_packages_version_groups(group: NodeGroup, packages_list: List or str) -> Dict[str, Dict[str, List]]:
"""
Detect packages versions for each host on remote group from specified list of packages
Detect grouped packages versions on remote group from specified list of packages.
:param group: Group of nodes, where packages should be found
:param packages_list: Single package or list of packages, which versions should be detected. If packages list empty,
then packages will be automatically added from services.packages.associations and services.packages.install.include
:return: Dictionary with NodeGroupResults for each queried package, e.g. "foo" -> {1.1.1.1:"foo-1", 1.1.1.2:"foo-2"}
:param packages_list: Single package or list of packages, which versions should be detected.
:return: Dictionary with grouped versions for each queried package, pointing to list of hosts,
e.g. {"foo" -> {"foo-1": [host1, host2]}, "bar" -> {"bar-1": [host1], "bar-2": [host2]}}
"""

cluster = group.cluster
# todo skip detection of cache_versions=false packages from outside
excluded_dict = {}

if packages_list is None:
packages_list = []
# packages from associations
for association_name, associated_params in cluster.inventory['services']['packages']['associations'].items():
indexed_by_pure_packages = get_indexed_by_pure_packages_for_association(group, association_name)
if not indexed_by_pure_packages:
continue
packages_list.extend(indexed_by_pure_packages.keys())
if not associated_params.get('cache_versions', True):
# replace packages with associated version that should be excluded from cache
excluded_dict.update(indexed_by_pure_packages)

if isinstance(packages_list, str):
packages_list = [packages_list]
# deduplicate
packages_list = list(set(packages_list))
if not packages_list:
return {}

with RemoteExecutor(cluster) as exe:
for package in packages_list:
detect_installed_package_version(group, package, warn=True)

raw_result = exe.get_last_results()
results: dict[str, NodeGroupResult] = {}
results: Dict[str, Dict[str, List]] = {}

for i, package in enumerate(packages_list):
results[package] = NodeGroupResult(cluster)
for host, multiple_results in raw_result.items():
detected_grouped_packages = {}
for conn, multiple_results in raw_result.items():
node_detected_package = multiple_results[i].stdout.strip() + multiple_results[i].stderr.strip()
# consider version, which ended with special symbol = or - as not installed
# (it is possible in some cases to receive "containerd=" version)
if "not installed" in node_detected_package or "no packages found" in node_detected_package \
or node_detected_package[-1] == '=' or node_detected_package[-1] == '-':
node_detected_package = f"not installed {package}"
elif package in excluded_dict.keys():
node_detected_package = excluded_dict[package]
results[package][host] = node_detected_package
detected_grouped_packages.setdefault(node_detected_package, []).append(conn.host)

return results
results[package] = detected_grouped_packages


def detect_installed_packages_version_groups(group: NodeGroup, packages_list: List or str = None) \
-> Dict[str, Dict[str, List]]:
"""
Detect grouped packages versions on remote group from specified list of packages.
:param group: Group of nodes, where packages should be found
:param packages_list: Single package or list of packages, which versions should be detected. If packages list empty,
then packages will be automatically added from services.packages.associations and services.packages.install.include
:return: Dictionary with grouped versions for each queried package, pointing to list of hosts,
e.g. {"foo" -> {"foo-1": [host1, host2]}, "bar" -> {"bar-1": [host1], "bar-2": [host2]}}
"""

detected_packages = detect_installed_packages_versions(group, packages_list)
grouped_packages: Dict[str, Dict[str, List]] = {}
for queried_package, detected_packages_results in detected_packages.items():
detected_grouped_packages = {}
for host, pckg in detected_packages_results.items():
detected_grouped_packages.setdefault(pckg, []).append(host)

grouped_packages[queried_package] = detected_grouped_packages

return grouped_packages
return results


def get_package_name(os_family: str, package: str) -> str:
Expand Down
7 changes: 4 additions & 3 deletions kubemarine/procedures/add_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
import copy

from collections import OrderedDict
from kubemarine import kubernetes, system
from kubemarine import kubernetes, packages
from kubemarine.core import flow, utils
from kubemarine.core.action import Action
from kubemarine.core.cluster import KubernetesCluster
from kubemarine.core.resources import DynamicResources
from kubemarine.procedures import install

Expand Down Expand Up @@ -92,13 +93,13 @@ def add_node_finalize_inventory(cluster, inventory_to_finalize):
return inventory_to_finalize


def cache_installed_packages(cluster):
def cache_installed_packages(cluster: KubernetesCluster):
"""
Task which is used to collect already installed packages versions on already existing nodes.
It is called first during "add_node" procedure,
so that new nodes install exactly the same packages as on other already existing nodes.
"""
cluster.cache_package_versions()
packages.cache_package_versions(cluster, cluster.inventory, ensured_associations_only=True)


tasks = OrderedDict(copy.deepcopy(install.tasks))
Expand Down

0 comments on commit dc62fdc

Please sign in to comment.