diff --git a/repos/system_upgrade/common/actors/forcedefaultboottotargetkernelversion/actor.py b/repos/system_upgrade/common/actors/forcedefaultboottotargetkernelversion/actor.py index 732fba0693..afb1369e35 100644 --- a/repos/system_upgrade/common/actors/forcedefaultboottotargetkernelversion/actor.py +++ b/repos/system_upgrade/common/actors/forcedefaultboottotargetkernelversion/actor.py @@ -1,6 +1,6 @@ from leapp.actors import Actor from leapp.libraries.actor import forcedefaultboot -from leapp.models import InstalledTargetKernelVersion +from leapp.models import InstalledTargetKernelInfo from leapp.tags import FinalizationPhaseTag, IPUWorkflowTag @@ -14,7 +14,7 @@ class ForceDefaultBootToTargetKernelVersion(Actor): """ name = 'force_default_boot_to_target_kernel_version' - consumes = (InstalledTargetKernelVersion,) + consumes = (InstalledTargetKernelInfo,) produces = () tags = (FinalizationPhaseTag, IPUWorkflowTag) diff --git a/repos/system_upgrade/common/actors/forcedefaultboottotargetkernelversion/libraries/forcedefaultboot.py b/repos/system_upgrade/common/actors/forcedefaultboottotargetkernelversion/libraries/forcedefaultboot.py index 3124fec40c..b5a4f9d0ea 100644 --- a/repos/system_upgrade/common/actors/forcedefaultboottotargetkernelversion/libraries/forcedefaultboot.py +++ b/repos/system_upgrade/common/actors/forcedefaultboottotargetkernelversion/libraries/forcedefaultboot.py @@ -1,58 +1,33 @@ -import os -from collections import namedtuple - from leapp.libraries import stdlib from leapp.libraries.common.config import architecture from leapp.libraries.stdlib import api, config -from leapp.models import InstalledTargetKernelVersion - -KernelInfo = namedtuple('KernelInfo', ('kernel_path', 'initrd_path')) - - -def get_kernel_info(message): - kernel_name = 'vmlinuz-{}'.format(message.version) - initrd_name = 'initramfs-{}.img'.format(message.version) - kernel_path = os.path.join('/boot', kernel_name) - initrd_path = os.path.join('/boot', initrd_name) - - target_version_bootable = True - if not os.path.exists(kernel_path): - target_version_bootable = False - api.current_logger().warning('Mandatory kernel %s does not exist', kernel_path) - if not os.path.exists(initrd_path): - target_version_bootable = False - api.current_logger().warning('Mandatory initrd %s does not exist', initrd_path) - - if target_version_bootable: - return KernelInfo(kernel_path=kernel_path, initrd_path=initrd_path) - - api.current_logger().warning('Skipping check due to missing mandatory files') - return None +from leapp.models import InstalledTargetKernelInfo def update_default_kernel(kernel_info): try: - stdlib.run(['grubby', '--info', kernel_info.kernel_path]) + stdlib.run(['grubby', '--info', kernel_info.kernel_img_path]) except stdlib.CalledProcessError: api.current_logger().error('Expected kernel %s to be installed at the boot loader but cannot be found.', - kernel_info.kernel_path) + kernel_info.kernel_img_path) except OSError: api.current_logger().error('Could not check for kernel existence in boot loader. Is grubby installed?') else: try: - stdlib.run(['grubby', '--set-default', kernel_info.kernel_path]) + stdlib.run(['grubby', '--set-default', kernel_info.kernel_img_path]) if architecture.matches_architecture(architecture.ARCH_S390X): # on s390x we need to call zipl explicitly because of issue in grubby, # otherwise the new boot entry will not be set as default # See https://bugzilla.redhat.com/show_bug.cgi?id=1764306 stdlib.run(['/usr/sbin/zipl']) except (OSError, stdlib.CalledProcessError): - api.current_logger().error('Failed to set default kernel to: %s', kernel_info.kernel_path, exc_info=True) + api.current_logger().error('Failed to set default kernel to: %s', + kernel_info.kernel_img_path, exc_info=True) def process(): - if (config.is_debug and not - architecture.matches_architecture(architecture.ARCH_S390X)): # pylint: disable=using-constant-test + is_system_s390x = architecture.matches_architecture(architecture.ARCH_S390X) + if config.is_debug and not is_system_s390x: # pylint: disable=using-constant-test try: # the following command prints output of grubenv for debugging purposes and is repeated after setting # default kernel so we can be sure we have the right saved entry @@ -65,12 +40,18 @@ def process(): stdlib.run(['grub2-editenv', 'list']) except stdlib.CalledProcessError: api.current_logger().error('Failed to execute "grub2-editenv list" command') - message = next(api.consume(InstalledTargetKernelVersion), None) - if not message: + + kernel_info = next(api.consume(InstalledTargetKernelInfo), None) + if not kernel_info: api.current_logger().warning(('Skipped - Forcing checking and setting default boot entry to target kernel' ' version due to missing message')) return + if not kernel_info.kernel_img_path: # Should be always set + api.current_logger().warning(('Skipping forcing of default boot entry - target kernel info ' + 'does not contain a kernel image path.')) + return + try: current_default_kernel = stdlib.run(['grubby', '--default-kernel'])['stdout'].strip() except (OSError, stdlib.CalledProcessError): @@ -84,17 +65,12 @@ def process(): api.current_logger().warning('Failed to query grubby for default {}'.format(type_), exc_info=True) return - kernel_info = get_kernel_info(message) - if not kernel_info: - return - - if current_default_kernel != kernel_info.kernel_path: + if current_default_kernel != kernel_info.kernel_img_path: api.current_logger().warning(('Current default boot entry not target kernel version: Current default: %s.' 'Forcing default kernel to %s'), - current_default_kernel, kernel_info.kernel_path) + current_default_kernel, kernel_info.kernel_img_path) update_default_kernel(kernel_info) - if (config.is_debug and not - architecture.matches_architecture(architecture.ARCH_S390X)): # pylint: disable=using-constant-test + if config.is_debug and not is_system_s390x: # pylint: disable=using-constant-test try: stdlib.run(['grub2-editenv', 'list']) except stdlib.CalledProcessError: diff --git a/repos/system_upgrade/common/actors/forcedefaultboottotargetkernelversion/tests/test_forcedefaultboot_forcedefaultboottotargetkernelversion.py b/repos/system_upgrade/common/actors/forcedefaultboottotargetkernelversion/tests/test_forcedefaultboot_forcedefaultboottotargetkernelversion.py index 231585df7f..7279c5f22e 100644 --- a/repos/system_upgrade/common/actors/forcedefaultboottotargetkernelversion/tests/test_forcedefaultboot_forcedefaultboottotargetkernelversion.py +++ b/repos/system_upgrade/common/actors/forcedefaultboottotargetkernelversion/tests/test_forcedefaultboot_forcedefaultboottotargetkernelversion.py @@ -8,7 +8,7 @@ from leapp.libraries.common.config import architecture from leapp.libraries.common.testutils import CurrentActorMocked, logger_mocked from leapp.libraries.stdlib import api -from leapp.models import InstalledTargetKernelVersion +from leapp.models import InstalledTargetKernelInfo Expected = namedtuple( 'Expected', ( @@ -19,8 +19,7 @@ Case = namedtuple( 'Case', - ('initrd_exists', - 'kernel_exists', + ('kernel_exists', 'entry_default', 'entry_exists', 'message_available', @@ -28,6 +27,7 @@ ) ) +TARGET_KERNEL_NEVRA = 'kernel-core-1.2.3.4.el8.x86_64' TARGET_KERNEL_VERSION = '1.2.3.4.el8.x86_64' TARGET_KERNEL_TITLE = 'Red Hat Enterprise Linux ({}) 8.1 (Ootpa)'.format(TARGET_KERNEL_VERSION) TARGET_KERNEL_PATH = '/boot/vmlinuz-{}'.format(TARGET_KERNEL_VERSION) @@ -37,48 +37,27 @@ OLD_KERNEL_TITLE = 'Red Hat Enterprise Linux ({}) 7.6 (Maipo)'.format(OLD_KERNEL_VERSION) OLD_KERNEL_PATH = '/boot/vmlinuz-{}'.format(OLD_KERNEL_VERSION) + CASES = ( - (Case(initrd_exists=True, kernel_exists=True, entry_default=True, entry_exists=True, message_available=True, - arch_s390x=False), - Expected(grubby_setdefault=False, zipl_called=False)), - (Case(initrd_exists=False, kernel_exists=True, entry_default=False, entry_exists=True, message_available=True, - arch_s390x=False), - Expected(grubby_setdefault=False, zipl_called=False)), - (Case(initrd_exists=True, kernel_exists=False, entry_default=False, entry_exists=True, message_available=True, - arch_s390x=False), + (Case(kernel_exists=True, entry_default=True, entry_exists=True, message_available=True, arch_s390x=False), Expected(grubby_setdefault=False, zipl_called=False)), - (Case(initrd_exists=False, kernel_exists=False, entry_default=False, entry_exists=True, message_available=True, - arch_s390x=False), + (Case(kernel_exists=False, entry_default=False, entry_exists=True, message_available=True, arch_s390x=False), Expected(grubby_setdefault=False, zipl_called=False)), - (Case(initrd_exists=True, kernel_exists=True, entry_default=False, entry_exists=True, message_available=False, - arch_s390x=False), + (Case(kernel_exists=True, entry_default=False, entry_exists=True, message_available=False, arch_s390x=False), Expected(grubby_setdefault=False, zipl_called=False)), - (Case(initrd_exists=True, kernel_exists=True, entry_default=False, entry_exists=False, message_available=False, - arch_s390x=False), + (Case(kernel_exists=True, entry_default=False, entry_exists=False, message_available=False, arch_s390x=False), Expected(grubby_setdefault=False, zipl_called=False)), - (Case(initrd_exists=True, kernel_exists=True, entry_default=False, entry_exists=True, message_available=True, - arch_s390x=False), + (Case(kernel_exists=True, entry_default=False, entry_exists=True, message_available=True, arch_s390x=False), Expected(grubby_setdefault=True, zipl_called=False)), - (Case(initrd_exists=True, kernel_exists=True, entry_default=True, entry_exists=True, message_available=True, - arch_s390x=True), - Expected(grubby_setdefault=False, zipl_called=False)), - (Case(initrd_exists=False, kernel_exists=True, entry_default=False, entry_exists=True, message_available=True, - arch_s390x=True), - Expected(grubby_setdefault=False, zipl_called=False)), - (Case(initrd_exists=True, kernel_exists=False, entry_default=False, entry_exists=True, message_available=True, - arch_s390x=True), + (Case(kernel_exists=True, entry_default=True, entry_exists=True, message_available=True, arch_s390x=True), Expected(grubby_setdefault=False, zipl_called=False)), - (Case(initrd_exists=False, kernel_exists=False, entry_default=False, entry_exists=True, message_available=True, - arch_s390x=True), + (Case(kernel_exists=False, entry_default=False, entry_exists=True, message_available=True, arch_s390x=True), Expected(grubby_setdefault=False, zipl_called=False)), - (Case(initrd_exists=True, kernel_exists=True, entry_default=False, entry_exists=True, message_available=False, - arch_s390x=True), + (Case(kernel_exists=True, entry_default=False, entry_exists=True, message_available=False, arch_s390x=True), Expected(grubby_setdefault=False, zipl_called=False)), - (Case(initrd_exists=True, kernel_exists=True, entry_default=False, entry_exists=False, message_available=False, - arch_s390x=True), + (Case(kernel_exists=True, entry_default=False, entry_exists=False, message_available=False, arch_s390x=True), Expected(grubby_setdefault=False, zipl_called=False)), - (Case(initrd_exists=True, kernel_exists=True, entry_default=False, entry_exists=True, message_available=True, - arch_s390x=True), + (Case(kernel_exists=True, entry_default=False, entry_exists=True, message_available=True, arch_s390x=True), Expected(grubby_setdefault=True, zipl_called=True)) ) @@ -143,7 +122,11 @@ def grubby_set_default(self, cmd): def mocked_consume(case): def impl(*args): if case.message_available: - return iter((InstalledTargetKernelVersion(version=TARGET_KERNEL_VERSION),)) + kernel_img_path = TARGET_KERNEL_PATH if case.kernel_exists else '' + msg = InstalledTargetKernelInfo(pkg_nevra=TARGET_KERNEL_NEVRA, + kernel_img_path=kernel_img_path, + initramfs_path=TARGET_INITRD_PATH) + return iter((msg,)) return iter(()) return impl diff --git a/repos/system_upgrade/common/actors/ipuworkflowconfig/libraries/ipuworkflowconfig.py b/repos/system_upgrade/common/actors/ipuworkflowconfig/libraries/ipuworkflowconfig.py index edf978f6ea..9e213f644c 100644 --- a/repos/system_upgrade/common/actors/ipuworkflowconfig/libraries/ipuworkflowconfig.py +++ b/repos/system_upgrade/common/actors/ipuworkflowconfig/libraries/ipuworkflowconfig.py @@ -68,6 +68,7 @@ def produce_ipu_config(actor): flavour = os.environ.get('LEAPP_UPGRADE_PATH_FLAVOUR') target_version = os.environ.get('LEAPP_UPGRADE_PATH_TARGET_RELEASE') os_release = get_os_release('/etc/os-release') + actor.produce(IPUConfig( leapp_env_vars=get_env_vars(), os_release=os_release, diff --git a/repos/system_upgrade/common/actors/kernel/checkinstalledkernels/actor.py b/repos/system_upgrade/common/actors/kernel/checkinstalledkernels/actor.py index d9ed844d5b..03ce3d9275 100644 --- a/repos/system_upgrade/common/actors/kernel/checkinstalledkernels/actor.py +++ b/repos/system_upgrade/common/actors/kernel/checkinstalledkernels/actor.py @@ -1,6 +1,6 @@ from leapp.actors import Actor from leapp.libraries.actor import checkinstalledkernels -from leapp.models import InstalledRedHatSignedRPM +from leapp.models import InstalledRedHatSignedRPM, KernelInfo from leapp.reporting import Report from leapp.tags import ChecksPhaseTag, IPUWorkflowTag @@ -30,7 +30,7 @@ class CheckInstalledKernels(Actor): """ name = 'check_installed_kernels' - consumes = (InstalledRedHatSignedRPM,) + consumes = (InstalledRedHatSignedRPM, KernelInfo) produces = (Report,) tags = (IPUWorkflowTag, ChecksPhaseTag) diff --git a/repos/system_upgrade/common/actors/kernel/checkinstalledkernels/libraries/checkinstalledkernels.py b/repos/system_upgrade/common/actors/kernel/checkinstalledkernels/libraries/checkinstalledkernels.py index 7dc5b2f267..e4f20d27aa 100644 --- a/repos/system_upgrade/common/actors/kernel/checkinstalledkernels/libraries/checkinstalledkernels.py +++ b/repos/system_upgrade/common/actors/kernel/checkinstalledkernels/libraries/checkinstalledkernels.py @@ -13,36 +13,12 @@ def labelCompare(*args): from leapp import reporting from leapp.exceptions import StopActorExecutionError -from leapp.libraries.common.config import architecture, version +from leapp.libraries.common.config import architecture, utils from leapp.libraries.stdlib import api -from leapp.models import InstalledRedHatSignedRPM +from leapp.models import InstalledRedHatSignedRPM, KernelInfo -def get_current_kernel_version(): - """ - Get the version of the running kernel as a string. - """ - return api.current_actor().configuration.kernel.split('-')[0] - - -def get_current_kernel_release(): - """ - Get the release of the current kernel as a string. - """ - return api.current_actor().configuration.kernel.split('-')[1] - - -def get_current_kernel_evr(): - """ - Get a 3-tuple (EVR) of the current booted kernel. - - Epoch in this case is always empty string. In case of kernel, epoch is - never expected to be set. - """ - return ('', get_current_kernel_version(), get_current_kernel_release()) - - -def get_pkgs(pkg_name): +def get_all_pkgs_with_name(pkg_name): """ Get all installed packages of the given name signed by Red Hat. """ @@ -56,17 +32,8 @@ def get_EVR(pkg): Epoch is always set as an empty string as in case of kernel epoch is not expected to be set - ever. - - The release includes an architecture as well. - """ - return ('', pkg.version, '{}.{}'.format(pkg.release, pkg.arch)) - - -def _get_pkgs_evr(pkgs): - """ - Return 3-tuples (EVR) of the given packages. """ - return [get_EVR(pkg) for pkg in pkgs] + return ('', pkg.version, pkg.release) def get_newest_evr(pkgs): @@ -78,35 +45,22 @@ def get_newest_evr(pkgs): """ if not pkgs: return None - rpms_evr = _get_pkgs_evr(pkgs) - newest_evr = rpms_evr.pop() - for pkg in rpms_evr: - if labelCompare(newest_evr, pkg) < 0: - newest_evr = pkg - return newest_evr - - -def _get_kernel_rpm_name(): - base_name = 'kernel' - if version.is_rhel_realtime(): - api.current_logger().info('The Real Time kernel boot detected.') - base_name = 'kernel-rt' + newest_evr = get_EVR(pkgs[0]) + for pkg in pkgs: + evr = get_EVR(pkg) + if labelCompare(newest_evr, evr) < 0: + newest_evr = evr - if version.get_source_major_version() == '7': - return base_name - - # Since RHEL 8, the kernel|kernel-rt rpm is just a metapackage that even - # does not have to be installed on the system. - # The kernel-core|kernel-rt-core rpm is the one we care about instead. - return '{}-core'.format(base_name) + return newest_evr def process(): - kernel_name = _get_kernel_rpm_name() - pkgs = get_pkgs(kernel_name) + kernel_info = utils.require_exactly_one_message_of_type(KernelInfo) + pkgs = get_all_pkgs_with_name(kernel_info.pkg.name) + if not pkgs: - # Hypothatical, user is not allowed to install any kernel that is not signed by RH + # Hypothetical, user is not allowed to install any kernel that is not signed by RH # In case we would like to be cautious, we could check whether there are no other # kernels installed as well. api.current_logger().error('Cannot find any installed kernel signed by Red Hat.') @@ -130,13 +84,13 @@ def process(): reporting.RelatedResource('package', 'kernel') ]) - current_evr = get_current_kernel_evr() - newest_evr = get_newest_evr(pkgs) + current_kernel_evr = get_EVR(kernel_info.pkg) + newest_kernel_evr = get_newest_evr(pkgs) - api.current_logger().debug('Current kernel EVR: {}'.format(current_evr)) - api.current_logger().debug('Newest kernel EVR: {}'.format(newest_evr)) + api.current_logger().debug('Current kernel EVR: {}'.format(current_kernel_evr)) + api.current_logger().debug('Newest kernel EVR: {}'.format(newest_kernel_evr)) - if current_evr != newest_evr: + if current_kernel_evr != newest_kernel_evr: title = 'Newest installed kernel not in use' summary = ('To ensure a stable upgrade, the machine needs to be' ' booted into the latest installed kernel.') diff --git a/repos/system_upgrade/common/actors/kernel/checkinstalledkernels/tests/unit_test_checkinstalledkernels.py b/repos/system_upgrade/common/actors/kernel/checkinstalledkernels/tests/unit_test_checkinstalledkernels.py index 3f42cb2ef9..816d3886f8 100644 --- a/repos/system_upgrade/common/actors/kernel/checkinstalledkernels/tests/unit_test_checkinstalledkernels.py +++ b/repos/system_upgrade/common/actors/kernel/checkinstalledkernels/tests/unit_test_checkinstalledkernels.py @@ -1,3 +1,5 @@ +from collections import namedtuple + import pytest from leapp import reporting @@ -5,121 +7,81 @@ from leapp.libraries.common.config import architecture from leapp.libraries.common.testutils import create_report_mocked, CurrentActorMocked, logger_mocked from leapp.libraries.stdlib import api -from leapp.models import InstalledRedHatSignedRPM, RPM +from leapp.models import InstalledRedHatSignedRPM, KernelInfo, RPM RH_PACKAGER = 'Red Hat, Inc. ' -# Do not make sense to run any tests when the module is not accessible +# Does not make sense to run any tests when the module is not accessible pytest.importorskip("rpm") -def create_rpm( - version, - release, - name='kernel', - packager=RH_PACKAGER, - pgpsig='SOME_OTHER_SIG_X', - epoch='0', - ): - return RPM( - name=name, - arch=release.split('.')[-1], - version=version, - release='.'.join(release.split('.')[0:-1]), - epoch='0', - packager=RH_PACKAGER, - pgpsig='SOME_OTHER_SIG_X', - ) +# Partial RPM description, missing fields are filled with defaults +RPMDesc = namedtuple('RPMDesc', ('name', 'version', 'release', 'arch')) -def create_rpms(pkgs): - installed_rpms = InstalledRedHatSignedRPM() - for pkg in pkgs: - installed_rpms.items.append( - create_rpm(name=pkg[0], version=pkg[1], release=pkg[2])) - return installed_rpms +def create_rpm(rpm_desc, packager=RH_PACKAGER, pgpsig='SOME_OTHER_SIG_X', epoch='0'): + return RPM(name=rpm_desc.name, arch=rpm_desc.arch, version=rpm_desc.version, release=rpm_desc.release, + epoch='0', packager=RH_PACKAGER, pgpsig='SOME_OTHER_SIG_X') -@pytest.mark.parametrize('vra,version,release', [ - ('3.10.0-1234.21.1.el7.x86_64', '3.10.0', '1234.21.1.el7.x86_64'), - ('5.8.8-100.fc31.x86_64', '5.8.8', '100.fc31.x86_64'), -]) -def test_current_kernel(monkeypatch, vra, version, release): - monkeypatch.setattr(api, 'current_actor', CurrentActorMocked(kernel=vra)) - assert version == checkinstalledkernels.get_current_kernel_version() - assert release == checkinstalledkernels.get_current_kernel_release() +def create_rpms(rpm_descriptions): + rpms = [create_rpm(rpm_desc) for rpm_desc in rpm_descriptions] + installed_rpms = InstalledRedHatSignedRPM(items=rpms) + return installed_rpms s390x_pkgs_single = [ - ('kernel', '3.10.0', '957.43.1.el7.s390x'), - ('something', '3.10.0', '957.43.1.el7.s390x'), - ('kernel-something', '3.10.0', '957.43.1.el7.s390x') + RPMDesc(name='kernel', version='3.10.0', release='957.43.1.el7', arch='s390x'), + RPMDesc(name='something', version='3.10.0', release='957.43.1.el7', arch='s390x'), + RPMDesc(name='kernel-something', version='3.10.0', release='957.43.1.el7', arch='s390x'), ] s390x_pkgs_multi = [ - ('kernel', '3.10.0', '957.43.1.el7.s390x'), - ('something', '3.10.0', '957.43.1.el7.s390x'), - ('kernel', '3.10.0', '956.43.1.el7.s390x') + RPMDesc(name='kernel', version='3.10.0', release='957.43.1.el7', arch='s390x'), + RPMDesc(name='something', version='3.10.0', release='957.43.1.el7', arch='s390x'), + RPMDesc(name='kernel', version='3.10.0', release='956.43.1.el7', arch='s390x') ] -def test_single_kernel_s390x(monkeypatch): - msgs = [create_rpms(s390x_pkgs_single)] - monkeypatch.setattr(api, 'current_actor', CurrentActorMocked( - arch=architecture.ARCH_S390X, - msgs=msgs, - kernel='3.10.0-957.43.1.el7.s390x'), - ) - monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) - checkinstalledkernels.process() - assert not reporting.create_report.called - - -def test_multi_kernel_s390x(monkeypatch): - msgs = [create_rpms(s390x_pkgs_multi)] - monkeypatch.setattr(api, 'current_actor', CurrentActorMocked( - arch=architecture.ARCH_S390X, - msgs=msgs, - kernel='3.10.0-957.43.1.el7.s390x'), +@pytest.mark.parametrize( + ('pkgs', 'should_inhibit'), # First tuple in pkgs is expected to provide the booted kernel + ( + (s390x_pkgs_single, False), + (s390x_pkgs_multi, True) ) +) +def test_s390x_kernel_count_inhibition(monkeypatch, pkgs, should_inhibit): + installed_rpms_msg = create_rpms(pkgs) + kernel_pkg = installed_rpms_msg.items[0] + kernel_info = KernelInfo(pkg=kernel_pkg, uname_r='957.43.1.el7.s390x') + + monkeypatch.setattr(api, 'current_actor', CurrentActorMocked(arch=architecture.ARCH_S390X, + msgs=[kernel_info, installed_rpms_msg])) monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) checkinstalledkernels.process() - assert reporting.create_report.called - assert reporting.create_report.report_fields['title'] == 'Multiple kernels installed' + assert should_inhibit == bool(reporting.create_report.called) versioned_kernel_pkgs = [ - ('kernel', '3.10.0', '456.43.1.el7.x86_64'), - ('kernel', '3.10.0', '789.35.2.el7.x86_64'), - ('kernel', '3.10.0', '1234.21.1.el7.x86_64') + RPMDesc(name='kernel', version='3.10.0', release='789.35.2.el7', arch='x86_64'), + RPMDesc(name='kernel', version='3.10.0', release='1234.21.1.el7', arch='x86_64'), + RPMDesc(name='kernel', version='4.14.0', release='115.29.1.el7', arch='x86_64'), # [2] - newest + RPMDesc(name='kernel', version='3.10.0', release='456.43.1.el7', arch='x86_64'), ] -@pytest.mark.parametrize('expect_report,msgs,curr_kernel', [ - (False, [create_rpms(versioned_kernel_pkgs)], '3.10.0-1234.21.1.el7.x86_64'), - (True, [create_rpms(versioned_kernel_pkgs)], '3.10.0-456.43.1.el7.x86_64'), - (True, [create_rpms(versioned_kernel_pkgs)], '3.10.0-789.35.2.el7.x86_64'), -]) -def test_newest_kernel(monkeypatch, expect_report, msgs, curr_kernel): - monkeypatch.setattr(api, 'current_actor', CurrentActorMocked(kernel=curr_kernel, msgs=msgs)) - monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) - checkinstalledkernels.process() - if expect_report: - assert reporting.create_report.called - assert reporting.create_report.report_fields['title'] == 'Newest installed kernel not in use' - else: - assert not reporting.create_report.called - - -# put the kernel in the middle of the list so that its position doesn't guarantee its rank -versioned_kernel_pkgs.insert(2, ('kernel', '4.14.0', '115.29.1.el7.x86_64')) - - -@pytest.mark.parametrize('expect_report,msgs,curr_kernel', [ - (True, [create_rpms(versioned_kernel_pkgs)], '3.10.0-1234.21.1.el7.x86_64'), - (False, [create_rpms(versioned_kernel_pkgs)], '4.14.0-115.29.1.el7.x86_64'), -]) -def test_newest_kernel_more_versions(monkeypatch, expect_report, msgs, curr_kernel): - monkeypatch.setattr(api, 'current_actor', CurrentActorMocked(kernel=curr_kernel, msgs=msgs)) +@pytest.mark.parametrize( + ('expect_report', 'installed_rpms_msg', 'current_kernel_pkg_index'), + ( + (False, create_rpms(versioned_kernel_pkgs), 2), + (True, create_rpms(versioned_kernel_pkgs), 1), + (True, create_rpms(versioned_kernel_pkgs), 0), + ) +) +def test_newest_kernel(monkeypatch, expect_report, installed_rpms_msg, current_kernel_pkg_index): + uname_r = '' # Kernel release is not used to determine the kernel novelty + kernel_info = KernelInfo(pkg=installed_rpms_msg.items[current_kernel_pkg_index], uname_r=uname_r) + msgs = [installed_rpms_msg, kernel_info] + monkeypatch.setattr(api, 'current_actor', CurrentActorMocked(msgs=msgs)) monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) checkinstalledkernels.process() if expect_report: @@ -129,91 +91,87 @@ def test_newest_kernel_more_versions(monkeypatch, expect_report, msgs, curr_kern assert not reporting.create_report.called -@pytest.mark.parametrize('evr', [ - ('', '3.10.0', '1234.21.1.el7.x86_64'), - ('', '3.10.0', '456.43.1.el7.x86_64'), - ('', '3.10.0', '1.1.1.1.1.1.1.2.el7x86_64'), - ('', '4.10.4', '1234.21.1.el7.x86_64'), - ('', '6.6.6', '1234.56.rt78.el9.x86_64'), -]) -def test_get_evr(monkeypatch, evr): - pkg = create_rpm(version=evr[1], release=evr[2]) - assert checkinstalledkernels.get_EVR(pkg) == evr +@pytest.mark.parametrize( + 'rpm_desc', + [ + RPMDesc(name='', version='3.10.0', release='1234.21.1.el7', arch='x86_64'), + RPMDesc(name='', version='3.10.0', release='456.43.1.el7', arch='x86_64'), + RPMDesc(name='', version='3.10.0', release='1.1.1.1.1.1.1.2', arch='x86_64'), + RPMDesc(name='', version='4.10.4', release='1234.21.1.el7', arch='x86_64'), + RPMDesc(name='', version='6.6.6', release='1234.56.rt78.el9', arch='x86_64'), + ] +) +def test_get_evr(monkeypatch, rpm_desc): + pkg = create_rpm(rpm_desc) + assert checkinstalledkernels.get_EVR(pkg) == ('', pkg.version, pkg.release) versioned_kernel_rt_pkgs = [ - ('kernel-rt', '3.10.0', '789.35.2.rt56.1133.el7.x86_64'), - ('kernel-rt', '3.10.0', '789.35.2.rt57.1133.el7.x86_64'), - ('kernel-rt', '3.10.0', '789.35.2.rt101.1133.el7.x86_64'), - ('kernel-rt', '3.10.0', '790.35.2.rt666.1133.el7.x86_64'), + RPMDesc(name='kernel-rt', version='3.10.0', release='789.35.2.rt56.1133.el7', arch='x86_64'), + RPMDesc(name='kernel-rt', version='3.10.0', release='789.35.2.rt57.1133.el7', arch='x86_64'), + RPMDesc(name='kernel-rt', version='3.10.0', release='789.35.2.rt101.1133.el7', arch='x86_64'), + RPMDesc(name='kernel-rt', version='3.10.0', release='790.35.2.rt666.1133.el7', arch='x86_64'), # [3] - newest ] -@pytest.mark.parametrize('msgs,num,name', [ - ([create_rpms(versioned_kernel_rt_pkgs)], 4, 'kernel-rt'), - ([create_rpms(versioned_kernel_rt_pkgs[0:-1])], 3, 'kernel-rt'), - ([create_rpms(versioned_kernel_rt_pkgs[0:-2])], 2, 'kernel-rt'), - ([create_rpms(versioned_kernel_rt_pkgs[0:-3])], 1, 'kernel-rt'), - ([create_rpms(versioned_kernel_rt_pkgs)], 0, 'kernel'), - ([create_rpms(versioned_kernel_rt_pkgs)], 0, 'smth'), - ([create_rpms(versioned_kernel_pkgs)], 0, 'kernel-rt'), - ([create_rpms(versioned_kernel_pkgs + versioned_kernel_rt_pkgs)], 4, 'kernel-rt'), - ([create_rpms(versioned_kernel_pkgs + versioned_kernel_rt_pkgs)], 4, 'kernel'), -]) +@pytest.mark.parametrize( + ('msgs', 'num', 'name'), + [ + ([create_rpms(versioned_kernel_rt_pkgs)], 4, 'kernel-rt'), + ([create_rpms(versioned_kernel_rt_pkgs[0:-1])], 3, 'kernel-rt'), + ([create_rpms(versioned_kernel_rt_pkgs[0:-2])], 2, 'kernel-rt'), + ([create_rpms(versioned_kernel_rt_pkgs[0:-3])], 1, 'kernel-rt'), + ([create_rpms(versioned_kernel_rt_pkgs)], 0, 'kernel'), + ([create_rpms(versioned_kernel_rt_pkgs)], 0, 'smth'), + ([create_rpms(versioned_kernel_pkgs)], 0, 'kernel-rt'), + ([create_rpms(versioned_kernel_pkgs + versioned_kernel_rt_pkgs)], 4, 'kernel-rt'), + ([create_rpms(versioned_kernel_pkgs + versioned_kernel_rt_pkgs)], 4, 'kernel'), + ] +) def test_get_pkgs(monkeypatch, msgs, num, name): monkeypatch.setattr(api, 'current_actor', CurrentActorMocked(msgs=msgs)) - pkgs = checkinstalledkernels.get_pkgs(name) + pkgs = checkinstalledkernels.get_all_pkgs_with_name(name) assert len(pkgs) == num -@pytest.mark.parametrize('expect_report,msgs,curr_kernel', [ - # kernel-rt only - (True, [create_rpms(versioned_kernel_rt_pkgs)], '3.10.0-789.35.2.rt56.1133.el7.x86_64'), - (True, [create_rpms(versioned_kernel_rt_pkgs)], '3.10.0-789.35.2.rt57.1133.el7.x86_64'), - (True, [create_rpms(versioned_kernel_rt_pkgs)], '3.10.0-789.35.2.rt101.1133.el7.x86_64'), - (False, [create_rpms(versioned_kernel_rt_pkgs)], '3.10.0-790.35.2.rt666.1133.el7.x86_64'), - (False, [create_rpms(versioned_kernel_rt_pkgs[0:-1])], '3.10.0-789.35.2.rt101.1133.el7.x86_64'), - (False, [create_rpms(versioned_kernel_rt_pkgs[0:1])], '3.10.0-789.35.2.rt56.1133.el7.x86_64'), +mixed_kernel_pkgs = create_rpms(versioned_kernel_rt_pkgs + versioned_kernel_pkgs) +mixed_kernel_pkgs_desc_table = { # Maps important pkgs from mixed_kernel_pkgs to their index so they can be ref'd + 'newest_rt': 3, + 'older_rt': 2, + 'newest_ordinary': 6, + 'older_ordinary': 5, +} + + +@pytest.mark.parametrize( + ('expect_report', 'installed_rpms_msg', 'curr_kernel_pkg_index'), + [ + # kernel-rt only + (True, create_rpms(versioned_kernel_rt_pkgs), 0), + (True, create_rpms(versioned_kernel_rt_pkgs), 1), + (True, create_rpms(versioned_kernel_rt_pkgs), 2), + (False, create_rpms(versioned_kernel_rt_pkgs), 3), # newest + (False, create_rpms(versioned_kernel_rt_pkgs[0:-1]), 2), + (False, create_rpms(versioned_kernel_rt_pkgs[0:1]), 0), + + # mix of kernel-rt + kernel + (True, mixed_kernel_pkgs, mixed_kernel_pkgs_desc_table['older_rt']), + (False, mixed_kernel_pkgs, mixed_kernel_pkgs_desc_table['newest_rt']), + (True, mixed_kernel_pkgs, mixed_kernel_pkgs_desc_table['older_ordinary']), + (False, mixed_kernel_pkgs, mixed_kernel_pkgs_desc_table['newest_ordinary']), + ] +) +def test_newest_kernel_realtime(monkeypatch, expect_report, installed_rpms_msg, curr_kernel_pkg_index): + current_kernel_pkg = installed_rpms_msg.items[curr_kernel_pkg_index] + kernel_info = KernelInfo(pkg=current_kernel_pkg, uname_r='') + msgs = [installed_rpms_msg, kernel_info] - # mix of kernel-rt + kernel - ( - True, - [create_rpms(versioned_kernel_rt_pkgs + versioned_kernel_pkgs)], - '3.10.0-789.35.2.rt101.1133.el7.x86_64' - ), - ( - False, - [create_rpms(versioned_kernel_rt_pkgs + versioned_kernel_pkgs)], - '3.10.0-790.35.2.rt666.1133.el7.x86_64' - ), - ( - True, - [create_rpms(versioned_kernel_rt_pkgs + versioned_kernel_pkgs)], - '3.10.0-1234.21.1.el7.x86_64' - ), - ( - False, - [create_rpms(versioned_kernel_rt_pkgs + versioned_kernel_pkgs)], - '4.14.0-115.29.1.el7.x86_64' - ), -]) -def test_newest_kernel_realtime(monkeypatch, expect_report, msgs, curr_kernel): - monkeypatch.setattr(api, 'current_actor', CurrentActorMocked(kernel=curr_kernel, msgs=msgs)) + monkeypatch.setattr(api, 'current_actor', CurrentActorMocked(msgs=msgs)) monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) checkinstalledkernels.process() + if expect_report: assert reporting.create_report.called assert reporting.create_report.report_fields['title'] == 'Newest installed kernel not in use' else: assert not reporting.create_report.called - - -@pytest.mark.parametrize('current_actor_mocked,expected_name', [ - (CurrentActorMocked(kernel='3.10.0-957.43.1.el7.x86_64', src_ver='7.9'), 'kernel'), - (CurrentActorMocked(kernel='3.10.0-789.35.2.rt56.1133.el7.x86_64', src_ver='7.9'), 'kernel-rt'), - (CurrentActorMocked(kernel='4.14.0-115.29.1.el7.x86_64', src_ver='8.6'), 'kernel-core'), - (CurrentActorMocked(kernel='4.14.0-789.35.2.rt56.1133.el8.x86_64', src_ver='8.6'), 'kernel-rt-core'), -]) -def test_kernel_name(monkeypatch, current_actor_mocked, expected_name): - monkeypatch.setattr(api, 'current_actor', current_actor_mocked) - assert expected_name == checkinstalledkernels._get_kernel_rpm_name() diff --git a/repos/system_upgrade/common/actors/kernelcmdlineconfig/actor.py b/repos/system_upgrade/common/actors/kernelcmdlineconfig/actor.py index 9fe28ea8ad..13c471135d 100644 --- a/repos/system_upgrade/common/actors/kernelcmdlineconfig/actor.py +++ b/repos/system_upgrade/common/actors/kernelcmdlineconfig/actor.py @@ -3,7 +3,7 @@ from leapp.actors import Actor from leapp.exceptions import StopActorExecutionError from leapp.libraries.actor import kernelcmdlineconfig -from leapp.models import FirmwareFacts, InstalledTargetKernelVersion, KernelCmdlineArg, TargetKernelCmdlineArgTasks +from leapp.models import FirmwareFacts, InstalledTargetKernelInfo, KernelCmdlineArg, TargetKernelCmdlineArgTasks from leapp.tags import FinalizationPhaseTag, IPUWorkflowTag @@ -13,7 +13,7 @@ class KernelCmdlineConfig(Actor): """ name = 'kernelcmdlineconfig' - consumes = (KernelCmdlineArg, InstalledTargetKernelVersion, FirmwareFacts, TargetKernelCmdlineArgTasks) + consumes = (KernelCmdlineArg, InstalledTargetKernelInfo, FirmwareFacts, TargetKernelCmdlineArgTasks) produces = () tags = (FinalizationPhaseTag, IPUWorkflowTag) diff --git a/repos/system_upgrade/common/actors/kernelcmdlineconfig/libraries/kernelcmdlineconfig.py b/repos/system_upgrade/common/actors/kernelcmdlineconfig/libraries/kernelcmdlineconfig.py index 7d013a5422..f98e8168c4 100644 --- a/repos/system_upgrade/common/actors/kernelcmdlineconfig/libraries/kernelcmdlineconfig.py +++ b/repos/system_upgrade/common/actors/kernelcmdlineconfig/libraries/kernelcmdlineconfig.py @@ -2,7 +2,7 @@ from leapp.libraries import stdlib from leapp.libraries.common.config import architecture from leapp.libraries.stdlib import api -from leapp.models import InstalledTargetKernelVersion, KernelCmdlineArg, TargetKernelCmdlineArgTasks +from leapp.models import InstalledTargetKernelInfo, KernelCmdlineArg, TargetKernelCmdlineArgTasks def run_grubby_cmd(cmd): @@ -32,8 +32,8 @@ def format_kernelarg_msgs_for_grubby_cmd(kernelarg_msgs): def modify_kernel_args_in_boot_cfg(configs_to_modify_explicitly=None): - kernel_version = next(api.consume(InstalledTargetKernelVersion), None) - if not kernel_version: + kernel_info = next(api.consume(InstalledTargetKernelInfo), None) + if not kernel_info: return # Collect desired kernelopt modifications @@ -46,7 +46,7 @@ def modify_kernel_args_in_boot_cfg(configs_to_modify_explicitly=None): if not kernelargs_msgs_to_add and not kernelargs_msgs_to_remove: return # There is no work to do - grubby_modify_kernelargs_cmd = ['grubby', '--update-kernel=/boot/vmlinuz-{}'.format(kernel_version.version)] + grubby_modify_kernelargs_cmd = ['grubby', '--update-kernel={0}'.format(kernel_info.kernel_img_path)] if kernelargs_msgs_to_add: grubby_modify_kernelargs_cmd += [ diff --git a/repos/system_upgrade/common/actors/kernelcmdlineconfig/tests/test_kernelcmdlineconfig.py b/repos/system_upgrade/common/actors/kernelcmdlineconfig/tests/test_kernelcmdlineconfig.py index 66f4f62f8d..3f34789924 100644 --- a/repos/system_upgrade/common/actors/kernelcmdlineconfig/tests/test_kernelcmdlineconfig.py +++ b/repos/system_upgrade/common/actors/kernelcmdlineconfig/tests/test_kernelcmdlineconfig.py @@ -7,9 +7,9 @@ from leapp.libraries.common.config import architecture from leapp.libraries.common.testutils import CurrentActorMocked from leapp.libraries.stdlib import api -from leapp.models import InstalledTargetKernelVersion, KernelCmdlineArg, TargetKernelCmdlineArgTasks +from leapp.models import InstalledTargetKernelInfo, KernelCmdlineArg, TargetKernelCmdlineArgTasks -KERNEL_VERSION = '1.2.3-4.x86_64.el8' +TARGET_KERNEL_NEVRA = 'kernel-core-1.2.3-4.x86_64.el8.x64_64' class MockedRun(object): @@ -51,21 +51,25 @@ def __call__(self, cmd, *args, **kwargs): ] ) def test_kernelcmdline_config_valid_msgs(monkeypatch, msgs, expected_grubby_kernelopt_args): - grubby_base_cmd = ['grubby', '--update-kernel=/boot/vmlinuz-{}'.format(KERNEL_VERSION)] + kernel_img_path = '/boot/vmlinuz-X' + kernel_info = InstalledTargetKernelInfo(pkg_nevra=TARGET_KERNEL_NEVRA, + kernel_img_path=kernel_img_path, + initramfs_path='/boot/initramfs-X') + msgs += [kernel_info] + + grubby_base_cmd = ['grubby', '--update-kernel={}'.format(kernel_img_path)] expected_grubby_cmd = grubby_base_cmd + expected_grubby_kernelopt_args mocked_run = MockedRun() monkeypatch.setattr(stdlib, 'run', mocked_run) - monkeypatch.setattr(api, 'current_actor', CurrentActorMocked(architecture.ARCH_X86_64, - msgs=[InstalledTargetKernelVersion(version=KERNEL_VERSION)] + msgs)) + monkeypatch.setattr(api, 'current_actor', CurrentActorMocked(architecture.ARCH_X86_64, msgs=msgs)) kernelcmdlineconfig.modify_kernel_args_in_boot_cfg() assert mocked_run.commands and len(mocked_run.commands) == 1 assert expected_grubby_cmd == mocked_run.commands.pop() mocked_run = MockedRun() monkeypatch.setattr(stdlib, 'run', mocked_run) - monkeypatch.setattr(api, 'current_actor', CurrentActorMocked(architecture.ARCH_S390X, - msgs=[InstalledTargetKernelVersion(version=KERNEL_VERSION)] + msgs)) + monkeypatch.setattr(api, 'current_actor', CurrentActorMocked(architecture.ARCH_S390X, msgs=msgs)) kernelcmdlineconfig.modify_kernel_args_in_boot_cfg() assert mocked_run.commands and len(mocked_run.commands) == 2 assert expected_grubby_cmd == mocked_run.commands.pop(0) @@ -73,16 +77,21 @@ def test_kernelcmdline_config_valid_msgs(monkeypatch, msgs, expected_grubby_kern def test_kernelcmdline_explicit_configs(monkeypatch): + kernel_img_path = '/boot/vmlinuz-X' + + kernel_info = InstalledTargetKernelInfo(pkg_nevra=TARGET_KERNEL_NEVRA, + kernel_img_path=kernel_img_path, + initramfs_path='/boot/initramfs-X') + msgs = [kernel_info, TargetKernelCmdlineArgTasks(to_remove=[KernelCmdlineArg(key='key1', value='value1')])] + mocked_run = MockedRun() monkeypatch.setattr(stdlib, 'run', mocked_run) - monkeypatch.setattr(api, 'current_actor', CurrentActorMocked(architecture.ARCH_X86_64, - msgs=[InstalledTargetKernelVersion(version=KERNEL_VERSION), - TargetKernelCmdlineArgTasks(to_remove=[KernelCmdlineArg(key='key1', value='value1')])])) + monkeypatch.setattr(api, 'current_actor', CurrentActorMocked(architecture.ARCH_X86_64, msgs=msgs)) configs = ['/boot/grub2/grub.cfg', '/boot/efi/EFI/redhat/grub.cfg'] kernelcmdlineconfig.modify_kernel_args_in_boot_cfg(configs_to_modify_explicitly=configs) - grubby_cmd_without_config = ['grubby', '--update-kernel=/boot/vmlinuz-{}'.format(KERNEL_VERSION), + grubby_cmd_without_config = ['grubby', '--update-kernel={}'.format(kernel_img_path), '--remove-args', 'key1=value1'] expected_cmds = [ grubby_cmd_without_config + ['-c', '/boot/grub2/grub.cfg'], @@ -93,10 +102,13 @@ def test_kernelcmdline_explicit_configs(monkeypatch): def test_kernelcmdline_config_no_args(monkeypatch): + kernel_info = InstalledTargetKernelInfo(pkg_nevra=TARGET_KERNEL_NEVRA, + kernel_img_path='/boot/vmlinuz-X', + initramfs_path='/boot/initramfs-X') + mocked_run = MockedRun() monkeypatch.setattr(stdlib, 'run', mocked_run) - monkeypatch.setattr(api, 'current_actor', CurrentActorMocked(architecture.ARCH_S390X, - msgs=[InstalledTargetKernelVersion(version=KERNEL_VERSION)])) + monkeypatch.setattr(api, 'current_actor', CurrentActorMocked(architecture.ARCH_S390X, msgs=[kernel_info])) kernelcmdlineconfig.modify_kernel_args_in_boot_cfg() assert not mocked_run.commands diff --git a/repos/system_upgrade/common/actors/scaninstalledtargetkernelversion/actor.py b/repos/system_upgrade/common/actors/scaninstalledtargetkernelversion/actor.py index 7c8d8552cc..8b71d2d92f 100644 --- a/repos/system_upgrade/common/actors/scaninstalledtargetkernelversion/actor.py +++ b/repos/system_upgrade/common/actors/scaninstalledtargetkernelversion/actor.py @@ -1,6 +1,6 @@ from leapp.actors import Actor from leapp.libraries.actor import scankernel -from leapp.models import InstalledTargetKernelVersion, TransactionCompleted +from leapp.models import InstalledTargetKernelInfo, InstalledTargetKernelVersion, KernelInfo, TransactionCompleted from leapp.tags import IPUWorkflowTag, RPMUpgradePhaseTag @@ -15,8 +15,8 @@ class ScanInstalledTargetKernelVersion(Actor): """ name = 'scan_installed_target_kernel_version' - consumes = (TransactionCompleted,) - produces = (InstalledTargetKernelVersion,) + consumes = (TransactionCompleted, KernelInfo) + produces = (InstalledTargetKernelInfo, InstalledTargetKernelVersion) tags = (RPMUpgradePhaseTag, IPUWorkflowTag) def process(self): diff --git a/repos/system_upgrade/common/actors/scaninstalledtargetkernelversion/libraries/scankernel.py b/repos/system_upgrade/common/actors/scaninstalledtargetkernelversion/libraries/scankernel.py index 14e0891ab5..6b75ed1399 100644 --- a/repos/system_upgrade/common/actors/scaninstalledtargetkernelversion/libraries/scankernel.py +++ b/repos/system_upgrade/common/actors/scaninstalledtargetkernelversion/libraries/scankernel.py @@ -1,53 +1,80 @@ -from leapp.libraries.common.config.version import get_target_major_version, is_rhel_realtime +import os +from collections import namedtuple + +from leapp.exceptions import StopActorExecutionError +from leapp.libraries.common import kernel as kernel_lib +from leapp.libraries.common.config.version import get_target_major_version from leapp.libraries.stdlib import api, CalledProcessError, run -from leapp.models import InstalledTargetKernelVersion +from leapp.models import InstalledTargetKernelInfo, InstalledTargetKernelVersion, KernelInfo + +KernelBootFiles = namedtuple('KernelBootFiles', ('vmlinuz_path', 'initramfs_path')) -def _get_kernel_version(kernel_name): +def get_target_kernel_package_nevra(kernel_pkg_name): try: - kernels = run(['rpm', '-q', kernel_name], split=True)['stdout'] + kernel_nevras = run(['rpm', '-q', kernel_pkg_name], split=True)['stdout'] except CalledProcessError: return '' - for kernel in kernels: - # name-version-release - we want the last two fields only - version = '-'.join(kernel.split('-')[-2:]) - if 'el{}'.format(get_target_major_version()) in version: - return version + target_kernel_el = 'el{}'.format(get_target_major_version()) + for kernel_nevra in kernel_nevras: + if target_kernel_el in kernel_nevra: + return kernel_nevra return '' +def get_boot_files_provided_by_kernel_pkg(kernel_nevra): + initramfs_path = '' + vmlinuz_path = '' + err_msg = 'Cannot determine location of the target kernel boot image and corresponding initramfs .' + try: + kernel_pkg_files = run(['rpm', '-q', '-l', kernel_nevra], split=True)['stdout'] + for kernel_file_path in kernel_pkg_files: + dirname = os.path.dirname(kernel_file_path) + if dirname != '/boot': + continue + basename = os.path.basename(kernel_file_path) + if basename.startswith('vmlinuz'): + vmlinuz_path = kernel_file_path + elif basename.startswith('initramfs'): + initramfs_path = kernel_file_path + except CalledProcessError: + raise StopActorExecutionError(err_msg) + if not vmlinuz_path or not initramfs_path: + raise StopActorExecutionError(err_msg) + return KernelBootFiles(vmlinuz_path=vmlinuz_path, initramfs_path=initramfs_path) + + def process(): # pylint: disable=no-else-return - false positive # TODO: should we take care about stuff of kernel-rt and kernel in the same # time when both are present? or just one? currently, handle only one # of these during the upgrade. kernel-rt has higher prio when original sys # was realtime + src_kernel_info = next(api.consume(KernelInfo), None) + if not src_kernel_info: + return # Will not happen, other actors would inhibit the upgrade + + target_ver = get_target_major_version() + target_kernel_pkg_name = kernel_lib.get_kernel_pkg_name(target_ver, src_kernel_info.type) + target_kernel_nevra = get_target_kernel_package_nevra(target_kernel_pkg_name) + + if src_kernel_info.type != kernel_lib.KernelType.ORDINARY and not target_kernel_nevra: + api.current_logger().warning('The kernel-rt-core rpm from the target RHEL has not been detected. Switching ' + 'to non-preemptive kernel.') + target_kernel_pkg_name = kernel_lib.get_kernel_pkg_name(target_ver, kernel_lib.KernelType.ORDINARY) + target_kernel_nevra = get_target_kernel_package_nevra(target_kernel_pkg_name) + + if target_kernel_nevra: + boot_files = get_boot_files_provided_by_kernel_pkg(target_kernel_nevra) + installed_kernel_info = InstalledTargetKernelInfo(pkg_nevra=target_kernel_nevra, + kernel_img_path=boot_files.vmlinuz_path, + initramfs_path=boot_files.initramfs_path) + api.produce(installed_kernel_info) - if is_rhel_realtime(): - version = _get_kernel_version('kernel-rt-core') - if version: - api.produce(InstalledTargetKernelVersion(version=version)) - return - else: - api.current_logger().warning( - 'The kernel-rt-core rpm from the target RHEL has not been detected. ' - 'Switching to non-preemptive kernel.' - ) - # TODO: create report with instructions to install kernel-rt manually - # - attach link to article if any - # - this possibly happens just in case the repository with kernel-rt - # # is not enabled during the upgrade. - - # standard (non-preemptive) kernel - version = _get_kernel_version('kernel-core') - if version: + # Backwards compatibility + # Expects that the kernel nevra has the following format: --. + version = '-'.join(target_kernel_nevra.split('-')[-2:]) # (-2)-th is ; take -... api.produce(InstalledTargetKernelVersion(version=version)) else: - # This is very unexpected situation. At least one kernel has to be - # installed always. Some actors consuming the InstalledTargetKernelVersion - # will crash without the created message. I am keeping kind of original - # behaviour in this case, but at least the let me log the error msg - # - api.current_logger().error('Cannot detect any kernel RPM') - # StopActorExecutionError('Cannot detect any target RHEL kernel RPM.') + raise StopActorExecutionError('Cannot detect any target RHEL kernel RPM.') # Unexpected. diff --git a/repos/system_upgrade/common/actors/scaninstalledtargetkernelversion/tests/test_scaninstalledkernel_scaninstalledtargetkernelversion.py b/repos/system_upgrade/common/actors/scaninstalledtargetkernelversion/tests/test_scaninstalledkernel_scaninstalledtargetkernelversion.py index 07e84f8896..7828074d0c 100644 --- a/repos/system_upgrade/common/actors/scaninstalledtargetkernelversion/tests/test_scaninstalledkernel_scaninstalledtargetkernelversion.py +++ b/repos/system_upgrade/common/actors/scaninstalledtargetkernelversion/tests/test_scaninstalledkernel_scaninstalledtargetkernelversion.py @@ -1,16 +1,17 @@ import pytest +from leapp.exceptions import StopActorExecutionError from leapp.libraries import stdlib from leapp.libraries.actor import scankernel +from leapp.libraries.common import kernel as kernel_lib from leapp.libraries.common.testutils import CurrentActorMocked, logger_mocked from leapp.libraries.stdlib import api +from leapp.models import InstalledTargetKernelInfo, InstalledTargetKernelVersion, KernelInfo, RPM -TARGET_KERNEL_VERSION = '1.2.3-4.el9.x86_64' -TARGET_RT_KERNEL_VERSION = '1.2.3-4.rt56.7.el9.x86_64' -TARGET_KERNEL = 'kernel-core-{}'.format(TARGET_KERNEL_VERSION) -TARGET_RT_KERNEL = 'kernel-rt-core-{}'.format(TARGET_RT_KERNEL_VERSION) -OLD_KERNEL = 'kernel-core-0.1.2-3.el8.x86_64' -OLD_RT_KERNEL = 'kernel-rt-core-0.1.2-3.rt4.5.el8.x86_64' +TARGET_KERNEL_NEVRA = 'kernel-core-1.2.3-4.el9.x86_64' +TARGET_RT_KERNEL_NEVRA = 'kernel-rt-core-1.2.3-4.rt56.7.el9.x86_64' +OLD_KERNEL_NEVRA = 'kernel-core-0.1.2-3.el8.x86_64' +OLD_RT_KERNEL_NEVRA = 'kernel-rt-core-0.1.2-3.rt4.5.el8.x86_64' class MockedRun(object): @@ -26,55 +27,133 @@ def __call__(self, *args, **kwargs): return {'stdout': []} -@pytest.mark.parametrize('is_rt,exp_version,stdouts', [ - (False, TARGET_KERNEL_VERSION, {'kernel-core': [OLD_KERNEL, TARGET_KERNEL]}), - (False, TARGET_KERNEL_VERSION, {'kernel-core': [TARGET_KERNEL, OLD_KERNEL]}), - (False, TARGET_KERNEL_VERSION, { - 'kernel-core': [TARGET_KERNEL, OLD_KERNEL], - 'kernel-rt-core': [TARGET_RT_KERNEL, OLD_RT_KERNEL], - }), - (True, TARGET_RT_KERNEL_VERSION, { - 'kernel-rt-core': [OLD_RT_KERNEL, TARGET_RT_KERNEL] - }), - (True, TARGET_RT_KERNEL_VERSION, { - 'kernel-rt-core': [TARGET_RT_KERNEL, OLD_RT_KERNEL] - }), - (True, TARGET_RT_KERNEL_VERSION, { - 'kernel-core': [TARGET_KERNEL, OLD_KERNEL], - 'kernel-rt-core': [TARGET_RT_KERNEL, OLD_RT_KERNEL], - }), -]) -def test_scaninstalledkernel(monkeypatch, is_rt, exp_version, stdouts): +def assert_produced_messages_are_correct(produced_messages, expected_target_nevra, initramfs_path, kernel_img_path): + target_evra = expected_target_nevra.replace('kernel-core-', '').replace('kernel-rt-core-', '') + installed_kernel_ver = [msg for msg in produced_messages if isinstance(msg, InstalledTargetKernelVersion)] + assert len(installed_kernel_ver) == 1, 'Actor should produce InstalledTargetKernelVersion (backwards compat.)' + assert installed_kernel_ver[0].version == target_evra + + installed_kernel_info = [msg for msg in produced_messages if isinstance(msg, InstalledTargetKernelInfo)] + assert len(installed_kernel_info) == 1 + assert installed_kernel_info[0].pkg_nevra == expected_target_nevra + + assert installed_kernel_info[0].initramfs_path == initramfs_path + assert installed_kernel_info[0].kernel_img_path == kernel_img_path + + +@pytest.mark.parametrize( + ('is_rt', 'expected_target_nevra', 'stdouts'), + [ + (False, TARGET_KERNEL_NEVRA, {'kernel-core': [OLD_KERNEL_NEVRA, TARGET_KERNEL_NEVRA]}), + (False, TARGET_KERNEL_NEVRA, {'kernel-core': [TARGET_KERNEL_NEVRA, OLD_KERNEL_NEVRA]}), + (False, TARGET_KERNEL_NEVRA, { + 'kernel-core': [TARGET_KERNEL_NEVRA, OLD_KERNEL_NEVRA], + 'kernel-rt-core': [TARGET_RT_KERNEL_NEVRA, OLD_RT_KERNEL_NEVRA], + }), + (True, TARGET_RT_KERNEL_NEVRA, { + 'kernel-rt-core': [OLD_RT_KERNEL_NEVRA, TARGET_RT_KERNEL_NEVRA] + }), + (True, TARGET_RT_KERNEL_NEVRA, { + 'kernel-rt-core': [TARGET_RT_KERNEL_NEVRA, OLD_RT_KERNEL_NEVRA] + }), + (True, TARGET_RT_KERNEL_NEVRA, { + 'kernel-core': [TARGET_KERNEL_NEVRA, OLD_KERNEL_NEVRA], + 'kernel-rt-core': [TARGET_RT_KERNEL_NEVRA, OLD_RT_KERNEL_NEVRA], + }), + ] +) +def test_scaninstalledkernel(monkeypatch, is_rt, expected_target_nevra, stdouts): + src_kernel_pkg = RPM(name='kernel-core', arch='x86_64', version='0.1.2', release='3', + epoch='0', packager='', pgpsig='SOME_OTHER_SIG_X') + src_kernel_type = kernel_lib.KernelType.REALTIME if is_rt else kernel_lib.KernelType.ORDINARY + src_kernel_info = KernelInfo(pkg=src_kernel_pkg, type=src_kernel_type, uname_r='X') + + def patched_get_boot_files(nevra): + assert nevra == expected_target_nevra + return scankernel.KernelBootFiles(vmlinuz_path='/boot/vmlinuz-X', initramfs_path='/boot/initramfs-X') + result = [] - old_kver = '0.1.2-3.rt4.5.el8.x86_64' if is_rt else 'kernel-core-0.1.2-3.el88x86_64' - monkeypatch.setattr(api, 'current_actor', CurrentActorMocked(dst_ver='9.0', kernel=old_kver)) + monkeypatch.setattr(api, 'current_actor', CurrentActorMocked(dst_ver='9.0', msgs=[src_kernel_info])) monkeypatch.setattr(api, 'produce', result.append) monkeypatch.setattr(scankernel, 'run', MockedRun(stdouts)) + monkeypatch.setattr(scankernel, 'get_boot_files_provided_by_kernel_pkg', patched_get_boot_files) + scankernel.process() - assert len(result) == 1 and result[0].version == exp_version + + assert_produced_messages_are_correct(result, expected_target_nevra, '/boot/initramfs-X', '/boot/vmlinuz-X') + + +@pytest.mark.parametrize( + ('vmlinuz_path', 'initramfs_path', 'extra_kernel_rpm_files'), + ( + ('/boot/vmlinuz-x', '/boot/initramfs-x', []), + ('/boot/vmlinuz-x', '/boot/initramfs-x', ['/lib/modules/6.4.10-100.fc37.x86_64/vmlinuz']), + (None, '/boot/initramfs-x', ['/lib/modules/6.4.10-100.fc37.x86_64/vmlinuz']), + ('/boot/vmlinuz-x', None, ['/lib/modules/6.4.10-100.fc37.x86_64/vmlinuz']), + ) +) +def test_get_boot_files_provided_by_kernel_pkg(monkeypatch, vmlinuz_path, initramfs_path, extra_kernel_rpm_files): + def mocked_run(cmd, *args, **kwargs): + assert cmd == ['rpm', '-q', '-l', TARGET_KERNEL_NEVRA] + + output = list(extra_kernel_rpm_files) + if vmlinuz_path: + output.append(vmlinuz_path) + if initramfs_path: + output.append(initramfs_path) + + return { + 'stdout': output + } + + monkeypatch.setattr(scankernel, 'run', mocked_run) + + if not vmlinuz_path or not initramfs_path: + with pytest.raises(StopActorExecutionError): + scankernel.get_boot_files_provided_by_kernel_pkg(TARGET_KERNEL_NEVRA) + else: + result = scankernel.get_boot_files_provided_by_kernel_pkg(TARGET_KERNEL_NEVRA) + assert result.vmlinuz_path == vmlinuz_path + assert result.initramfs_path == initramfs_path def test_scaninstalledkernel_missing_rt(monkeypatch): + src_kernel_pkg = RPM(name='kernel-rt-core', arch='x86_64', version='0.1.2', release='3', + epoch='0', packager='', pgpsig='SOME_OTHER_SIG_X') + src_kernel_type = kernel_lib.KernelType.REALTIME + src_kernel_info = KernelInfo(pkg=src_kernel_pkg, type=src_kernel_type, uname_r='X') + result = [] - old_kver = '0.1.2-3.rt4.5.el8.x86_64' - stdouts = {'kernel-core': [TARGET_KERNEL], 'kernel-rt-core': [OLD_RT_KERNEL]} - monkeypatch.setattr(api, 'current_actor', CurrentActorMocked(dst_ver='9.0', kernel=old_kver)) + stdouts = {'kernel-core': [TARGET_KERNEL_NEVRA], 'kernel-rt-core': [OLD_RT_KERNEL_NEVRA]} + + def patched_get_boot_content(target_nevra): + return scankernel.KernelBootFiles(vmlinuz_path='/boot/vmlinuz-X', initramfs_path='/boot/initramfs-X') + + monkeypatch.setattr(api, 'current_actor', CurrentActorMocked(dst_ver='9.0', msgs=[src_kernel_info])) monkeypatch.setattr(api, 'current_logger', logger_mocked()) monkeypatch.setattr(api, 'produce', result.append) monkeypatch.setattr(scankernel, 'run', MockedRun(stdouts)) + monkeypatch.setattr(scankernel, 'get_boot_files_provided_by_kernel_pkg', patched_get_boot_content) + scankernel.process() + assert api.current_logger.warnmsg - assert len(result) == 1 and result[0].version == TARGET_KERNEL_VERSION + + assert_produced_messages_are_correct(result, TARGET_KERNEL_NEVRA, '/boot/initramfs-X', '/boot/vmlinuz-X') def test_scaninstalledkernel_missing(monkeypatch): + src_kernel_pkg = RPM(name='kernel-rt-core', arch='x86_64', version='0.1.2', release='3', + epoch='0', packager='', pgpsig='SOME_OTHER_SIG_X') + src_kernel_type = kernel_lib.KernelType.REALTIME + src_kernel_info = KernelInfo(pkg=src_kernel_pkg, type=src_kernel_type, uname_r='X') + result = [] - old_kver = '0.1.2-3.rt4.5.el8.x86_64' - monkeypatch.setattr(api, 'current_actor', CurrentActorMocked(kernel=old_kver)) + + monkeypatch.setattr(api, 'current_actor', CurrentActorMocked(msgs=[src_kernel_info])) monkeypatch.setattr(api, 'current_logger', logger_mocked()) monkeypatch.setattr(api, 'produce', result.append) monkeypatch.setattr(scankernel, 'run', MockedRun({})) - scankernel.process() - assert api.current_logger.warnmsg - assert api.current_logger.errmsg - assert not result + + with pytest.raises(StopActorExecutionError): + scankernel.process() diff --git a/repos/system_upgrade/common/actors/scansourcekernel/actor.py b/repos/system_upgrade/common/actors/scansourcekernel/actor.py new file mode 100644 index 0000000000..1d9657223b --- /dev/null +++ b/repos/system_upgrade/common/actors/scansourcekernel/actor.py @@ -0,0 +1,18 @@ +from leapp.actors import Actor +from leapp.libraries.actor import scan_source_kernel as scan_source_kernel_lib +from leapp.models import InstalledRPM, KernelInfo +from leapp.tags import FactsPhaseTag, IPUWorkflowTag + + +class ScanSourceKernel(Actor): + """ + Scan the source system kernel. + """ + + name = 'scan_source_kernel' + consumes = (InstalledRPM,) + produces = (KernelInfo,) + tags = (IPUWorkflowTag, FactsPhaseTag) + + def process(self): + scan_source_kernel_lib.scan_source_kernel() diff --git a/repos/system_upgrade/common/actors/scansourcekernel/libraries/scan_source_kernel.py b/repos/system_upgrade/common/actors/scansourcekernel/libraries/scan_source_kernel.py new file mode 100644 index 0000000000..f85a3b5b5b --- /dev/null +++ b/repos/system_upgrade/common/actors/scansourcekernel/libraries/scan_source_kernel.py @@ -0,0 +1,30 @@ +import itertools + +from leapp.exceptions import StopActorExecutionError +from leapp.libraries.common import kernel as kernel_lib +from leapp.libraries.common.config.version import get_source_version +from leapp.libraries.stdlib import api +from leapp.models import InstalledRPM, KernelInfo + + +def scan_source_kernel(): + uname_r = api.current_actor().configuration.kernel + installed_rpms = [msg.items for msg in api.consume(InstalledRPM)] + installed_rpms = list(itertools.chain(*installed_rpms)) + + kernel_type = kernel_lib.determine_kernel_type_from_uname(get_source_version(), uname_r) + kernel_pkg_info = kernel_lib.get_kernel_pkg_info_for_uname_r(uname_r) + + kernel_pkg_id = (kernel_pkg_info.name, kernel_pkg_info.version, kernel_pkg_info.release, kernel_pkg_info.arch) + kernel_pkg = None + for pkg in installed_rpms: + pkg_id = (pkg.name, pkg.version, pkg.release, pkg.arch) + if kernel_pkg_id == pkg_id: + kernel_pkg = pkg + break + + if not kernel_pkg: + raise StopActorExecutionError(message='Unable to identify package providing the booted kernel.') + + kernel_info = KernelInfo(pkg=kernel_pkg, type=kernel_type, uname_r=uname_r) + api.produce(kernel_info) diff --git a/repos/system_upgrade/common/libraries/config/version.py b/repos/system_upgrade/common/libraries/config/version.py index e146bd9bad..0f1e5874a2 100644 --- a/repos/system_upgrade/common/libraries/config/version.py +++ b/repos/system_upgrade/common/libraries/config/version.py @@ -2,7 +2,9 @@ import six +from leapp.libraries.common import kernel as kernel_lib from leapp.libraries.stdlib import api +from leapp.utils.deprecation import deprecated OP_MAP = { '>': operator.gt, @@ -285,6 +287,7 @@ def is_rhel_alt(): return conf.os_release.release_id == 'rhel' and conf.kernel[0] == '4' +@deprecated(since='2023-08-15', message='This information is now provided by KernelInfo message.') def is_rhel_realtime(): """ Check whether the original system is RHEL Real Time. @@ -301,7 +304,9 @@ def is_rhel_realtime(): conf = api.current_actor().configuration if conf.os_release.release_id != 'rhel': return False - return '.rt' in conf.kernel.split('-')[1] + + kernel_type = kernel_lib.determine_kernel_type_from_uname(get_source_version(), conf.kernel) + return kernel_type == kernel_lib.KernelType.REALTIME def is_supported_version(): diff --git a/repos/system_upgrade/common/libraries/kernel.py b/repos/system_upgrade/common/libraries/kernel.py new file mode 100644 index 0000000000..ee1c5eae9c --- /dev/null +++ b/repos/system_upgrade/common/libraries/kernel.py @@ -0,0 +1,135 @@ +from collections import namedtuple + +from leapp.exceptions import StopActorExecutionError +from leapp.libraries.stdlib import api, CalledProcessError, run + +KernelPkgInfo = namedtuple('KernelPkgInfo', ('name', 'version', 'release', 'arch', 'nevra')) + + +KERNEL_UNAME_R_PROVIDES = ['kernel-uname-r', 'kernel-rt-uname-r'] + + +class KernelType(object): + ORDINARY = 'ordinary' + REALTIME = 'realtime' + + +def get_kernel_pkg_name(rhel_major_version, kernel_type): + """ + Get the name of the package providing kernel binaries. + + :param str rhel_major_version: RHEL major version + :param KernelType kernel_type: Type of the kernel + :returns: Kernel package name + :rtype: str + """ + if rhel_major_version == '7': + kernel_pkg_name_table = { + KernelType.ORDINARY: 'kernel', + KernelType.REALTIME: 'kernel-rt' + } + else: + kernel_pkg_name_table = { + KernelType.ORDINARY: 'kernel-core', + KernelType.REALTIME: 'kernel-rt-core' + } + return kernel_pkg_name_table[kernel_type] + + +def determine_kernel_type_from_uname(rhel_version, kernel_uname_r): + """ + Determine kernel type from given kernel release (uname-r). + + :param str rhel_version: Version of RHEL for which the kernel with the uname-r is targeted. + :param str kernel_uname_r: Kernel release (uname-r) + :returns: Kernel type based on a given uname_r + :rtype: KernelType + """ + version_fragments = rhel_version.split('.') + major_ver = version_fragments[0] + minor_ver = version_fragments[1] if len(version_fragments) > 1 else '0' + rhel_version = (major_ver, minor_ver) + + if rhel_version <= ('9', '2'): + uname_r_infixes = { + '.rt': KernelType.REALTIME + } + for infix, kernel_type in uname_r_infixes.items(): + if infix in kernel_uname_r: + return kernel_type + else: + uname_r_suffixes = { + '+rt': KernelType.REALTIME + } + + for suffix, kernel_type in uname_r_suffixes.items(): + if kernel_uname_r.endswith(suffix): + return kernel_type + + return KernelType.ORDINARY + + +def get_uname_r_provided_by_kernel_pkg(kernel_pkg_nevra): + """ + Get kernel release (uname-r) provided by a given kernel package. + + Calls the ``rpm`` command internally and might raise CalledProcessError if the rpm query fails. + + :param str kernel_pkg_nevra: NEVRA of an installed kernel package + :returns: uname-r provided by the given package + :rtype: str + """ + provides = run(['rpm', '-q', '--provides', kernel_pkg_nevra], split=True)['stdout'] + for provide_line in provides: + if '=' not in provide_line: + continue + provide, value = provide_line.split('=', 1) + provide = provide.strip() + if provide in KERNEL_UNAME_R_PROVIDES: + return value.strip() + return '' + + +def get_kernel_pkg_info(kernel_pkg_nevra): + """ + Query the RPM database for information about the given kernel package. + + Calls the ``rpm`` command internally and might raise CalledProcessError if the rpm query fails. + + :param str kernel_pkg_nevra: NEVRA of an installed kernel package + :returns: Information about the given kernel package + :rtype: KernelPkgInfo + """ + query_format = '%{NAME}|%{VERSION}|%{RELEASE}|%{ARCH}|' + pkg_info = run(['rpm', '-q', '--queryformat', query_format, kernel_pkg_nevra])['stdout'].strip().split('|') + return KernelPkgInfo(name=pkg_info[0], version=pkg_info[1], release=pkg_info[2], arch=pkg_info[3], + nevra=kernel_pkg_nevra) + + +def get_kernel_pkg_info_for_uname_r(uname_r): + """ + Identify the kernel package providing a kernel with the given kernel release (uname-r). + + Raises ``StopActorExecutionError`` if no package provides given uname_r or if the internal rpm query fails. + :param str uname_r: NEVRA of an installed kernel package + :returns: Information about the kernel package providing given uname_r + :rtype: KernelPkgInfo + """ + kernel_pkg_nevras = [] + for kernel_uname_r_provide in KERNEL_UNAME_R_PROVIDES: + try: + kernel_pkg_nevras += run(['rpm', '-q', '--whatprovides', kernel_uname_r_provide], split=True)['stdout'] + except CalledProcessError: # There is nothing providing a particular provide, e.g, kernel-rt-uname-r + continue # Nothing bad happened, continue + + kernel_pkg_nevras = set(kernel_pkg_nevras) + + for kernel_pkg_nevra in kernel_pkg_nevras: + provided_uname = get_uname_r_provided_by_kernel_pkg(kernel_pkg_nevra) # We know all packages provide a uname + if not provided_uname: + api.current_logger().warning('Failed to obtain uname-r provided by %s', kernel_pkg_nevra) + if provided_uname == uname_r: + return get_kernel_pkg_info(kernel_pkg_nevra) + + raise StopActorExecutionError(message='Unable to obtain kernel information of the booted kernel: no package is ' + 'providing the booted kernel release returned by uname.') diff --git a/repos/system_upgrade/common/libraries/tests/test_kernel_lib.py b/repos/system_upgrade/common/libraries/tests/test_kernel_lib.py new file mode 100644 index 0000000000..a6696a3c94 --- /dev/null +++ b/repos/system_upgrade/common/libraries/tests/test_kernel_lib.py @@ -0,0 +1,78 @@ +import functools + +import pytest + +from leapp.exceptions import StopActorExecutionError +from leapp.libraries.common import kernel as kernel_lib +from leapp.libraries.common.kernel import KernelType + + +@pytest.mark.parametrize( + ('rhel_version', 'uname_r', 'expected_kernel_type'), + ( + ('7.9', '3.10.0-1160.el7.x86_64', KernelType.ORDINARY), + ('7.9', '3.10.0-1160.rt56.1131.el7.x86_64', KernelType.REALTIME), + ('8.7', '4.18.0-425.3.1.el8.x86_64', KernelType.ORDINARY), + ('8.7', '4.18.0-425.3.1.rt7.213.el8.x86_64', KernelType.REALTIME), + ('9.2', '5.14.0-284.11.1.el9_2.x86_64', KernelType.ORDINARY), + ('9.2', '5.14.0-284.11.1.rt14.296.el9_2.x86_64', KernelType.REALTIME), + ('9.3', '5.14.0-354.el9.x86_64', KernelType.ORDINARY), + ('9.3', '5.14.0-354.el9.x86_64+rt', KernelType.REALTIME), + ) +) +def test_determine_kernel_type_from_uname(rhel_version, uname_r, expected_kernel_type): + kernel_type = kernel_lib.determine_kernel_type_from_uname(rhel_version, uname_r) + assert kernel_type == expected_kernel_type + + +def test_get_uname_r_provided_by_kernel_pkg(monkeypatch): + kernel_nevra = 'kernel-core-5.14.0-354.el9.x86_64' + + def run_mocked(cmd, *args, **kwargs): + assert cmd == ['rpm', '-q', '--provides', kernel_nevra] + output_lines = [ + 'kmod(virtio_ring.ko)', + 'kernel(zlib_inflate_blob) = 0x65408378', + 'kernel-uname-r = 5.14.0-354.el9.x86_64' + ] + return {'stdout': output_lines} + + monkeypatch.setattr(kernel_lib, 'run', run_mocked) + + uname_r = kernel_lib.get_uname_r_provided_by_kernel_pkg(kernel_nevra) + assert uname_r == '5.14.0-354.el9.x86_64' + + +@pytest.mark.parametrize('kernel_pkg_with_uname_installed', (True, False)) +def test_get_kernel_pkg_info_for_uname_r(monkeypatch, kernel_pkg_with_uname_installed): + uname_r = '5.14.0-354.el9.x86_64' if kernel_pkg_with_uname_installed else 'other-uname' + + def run_mocked(cmd, *args, **kwargs): + assert cmd[0:3] == ['rpm', '-q', '--whatprovides'] + output_lines = [ + 'kernel-core-5.14.0-354.el9.x86_64.rpm', + 'kernel-rt-core-5.14.0-354.el9.x86_64.rpm', + ] + return {'stdout': output_lines} + + def get_uname_provided_by_pkg_mocked(pkg_nevra): + nevra_uname_table = { + 'kernel-core-5.14.0-354.el9.x86_64.rpm': '5.14.0-354.el9.x86_64', + 'kernel-rt-core-5.14.0-354.el9.x86_64.rpm': '5.14.0-354.el9.x86_64+rt' + } + return nevra_uname_table[pkg_nevra] # Will raise if a different nevra is used than ones from run_mocked + + monkeypatch.setattr(kernel_lib, 'run', run_mocked) + monkeypatch.setattr(kernel_lib, 'get_uname_r_provided_by_kernel_pkg', get_uname_provided_by_pkg_mocked) + + mk_pkg_info = functools.partial(kernel_lib.KernelPkgInfo, name='', version='', release='', arch='') + monkeypatch.setattr(kernel_lib, + 'get_kernel_pkg_info', + lambda dummy_nevra: mk_pkg_info(nevra=dummy_nevra)) + + if kernel_pkg_with_uname_installed: + pkg_info = kernel_lib.get_kernel_pkg_info_for_uname_r(uname_r) + assert pkg_info == mk_pkg_info(nevra='kernel-core-5.14.0-354.el9.x86_64.rpm') + else: + with pytest.raises(StopActorExecutionError): + pkg_info = kernel_lib.get_kernel_pkg_info_for_uname_r(uname_r) diff --git a/repos/system_upgrade/common/libraries/utils.py b/repos/system_upgrade/common/libraries/utils.py index 943a6e0b30..36e798016a 100644 --- a/repos/system_upgrade/common/libraries/utils.py +++ b/repos/system_upgrade/common/libraries/utils.py @@ -176,3 +176,31 @@ def read_file(path): """ with open(path, 'r') as f: return f.read() + + +def require_exactly_one_message_of_type(model_class, error_class=StopActorExecutionError): + model_instances = api.consume(model_class) + model_instance = next(model_instances, None) + if not model_instance: + msg = 'Exactly one {cls_name} message of type is required, however, none was received.' + msg = msg.format(cls_name=model_class.__name__) + raise error_class(msg) + + next_instance = next(model_instances, None) + if next_instance: + msg = 'Exactly one {cls_name} message is required, however, more than one messages were received.' + msg = msg.format(cls_name=model_class.__name__) + raise error_class(msg) + + return model_instance + + +def require_some_message_of_type(model_class, error_class=StopActorExecutionError): + model_instances = api.consume(model_class) + model_instance = next(model_instances, None) + if not model_instance: + msg = 'Exactly one {cls_name} message of type is required, however, none was received.' + msg = msg.format(cls_name=model_class.__name__) + raise error_class(msg) + + return model_instance diff --git a/repos/system_upgrade/common/models/installedtargetkernelversion.py b/repos/system_upgrade/common/models/installedtargetkernelversion.py index 45a093b9d6..fbc0cd48d6 100644 --- a/repos/system_upgrade/common/models/installedtargetkernelversion.py +++ b/repos/system_upgrade/common/models/installedtargetkernelversion.py @@ -1,7 +1,9 @@ -from leapp.models import fields, Model +from leapp.models import fields, Model, RPM from leapp.topics import SystemInfoTopic +from leapp.utils.deprecation import deprecated +@deprecated(since='2023-08-03', message='The model has been deprecated in favour of InstalledTargetKernelInfo.') class InstalledTargetKernelVersion(Model): """ This message is used to propagate the version of the kernel that has been installed during the upgrade process. @@ -10,3 +12,39 @@ class InstalledTargetKernelVersion(Model): """ topic = SystemInfoTopic version = fields.String() + + +class KernelInfo(Model): + """ + Information about the booted kernel. + """ + topic = SystemInfoTopic + + pkg = fields.Model(RPM) + """ Package providing the booted kernel. """ + + uname_r = fields.String() + """``uname -r`` of the booted kernel.""" + + type = fields.StringEnum(['ordinary', 'realtime'], default='ordinary') + # @FixMe(mhecko): I want to use kernel_lib.KernelType here, but I cannot import any library code (yet). + # # Figure out how to do it. + + +class InstalledTargetKernelInfo(Model): + """Information about the installed target kernel.""" + topic = SystemInfoTopic + + pkg_nevra = fields.String() + """Name, epoch, version, release, arch of the target kernel package.""" + + # No one uses this information yet and scanning it it costly if running in debug (large + # RPM provides of kernel) flood stdout + # uname_r = fields.String() + # """Kernel release of the target kernel.""" + + kernel_img_path = fields.String() + """Path to the vmlinuz kernel image stored in ``/boot``.""" + + initramfs_path = fields.String() + """Path to the initramfs image stored in ``/boot``.""" diff --git a/repos/system_upgrade/common/models/ipuconfig.py b/repos/system_upgrade/common/models/ipuconfig.py index aa42378408..6e7e21b582 100644 --- a/repos/system_upgrade/common/models/ipuconfig.py +++ b/repos/system_upgrade/common/models/ipuconfig.py @@ -53,7 +53,9 @@ class IPUConfig(Model): """Architecture of the system. E.g.: 'x86_64'.""" kernel = fields.String() - """Originally booted kernel when on the source system.""" + """ + Originally booted kernel when on the source system. + """ flavour = fields.StringEnum(('default', 'saphana'), default='default') """Flavour of the upgrade - Used to influence changes in supported source/target release"""