Skip to content

Commit

Permalink
Fixes in restore procedure
Browse files Browse the repository at this point in the history
1. Always unpack before running the task.
2. Always verify Kubernetes version. Move enrichment of the version to kubernetes module.
3. Recreate inventory if the version was restored from backup.
  • Loading branch information
ilia1243 committed Oct 13, 2023
1 parent 6f070f2 commit 9fbf63a
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 57 deletions.
1 change: 1 addition & 0 deletions kubemarine/core/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"kubemarine.controlplane.controlplane_node_enrichment",
"kubemarine.core.defaults.append_controlplain",
"kubemarine.kubernetes.enrich_upgrade_inventory",
"kubemarine.kubernetes.enrich_restore_inventory",
"kubemarine.core.defaults.compile_inventory",
"kubemarine.core.defaults.manage_true_false_values",
"kubemarine.plugins.enrich_upgrade_inventory",
Expand Down
21 changes: 11 additions & 10 deletions kubemarine/core/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,11 @@ def run_tasks(resources: res.DynamicResources, tasks: dict, cumulative_points: d
else [] if not args.get('tasks') else args['tasks'].split(",")
excluded_tasks = [] if not args.get('exclude') else args['exclude'].split(",")

print("Excluded tasks:")
filtered_tasks, final_list = filter_flow(tasks, tasks_filter, excluded_tasks)
logger = resources.logger()
logger.debug("Excluded tasks:")
filtered_tasks, final_list = filter_flow(tasks, tasks_filter, excluded_tasks, logger)
if filtered_tasks == tasks:
print("\tNo excluded tasks")
logger.debug("\tNo excluded tasks")

cluster = resources.cluster()

Expand Down Expand Up @@ -220,8 +221,6 @@ def create_context(parser: argparse.ArgumentParser, cli_arguments: Optional[list

if args.get('exclude_cumulative_points_methods', '').strip() != '':
args['exclude_cumulative_points_methods'] = args['exclude_cumulative_points_methods'].strip().split(",")
# print('The following cumulative points methods are marked for exclusion: [ %s ]' %
# ', '.join(args['exclude_cumulative_points_methods']))
else:
args['exclude_cumulative_points_methods'] = []

Expand All @@ -231,16 +230,18 @@ def create_context(parser: argparse.ArgumentParser, cli_arguments: Optional[list
return context


def filter_flow(tasks: dict, tasks_filter: List[str], excluded_tasks: List[str]) -> Tuple[dict, List[str]]:
def filter_flow(tasks: dict, tasks_filter: List[str], excluded_tasks: List[str],
logger: log.EnhancedLogger = None) -> Tuple[dict, List[str]]:
# Remove any whitespaces from filters, and split by '.'
tasks_path_filter = [tasks.split(".") for tasks in list(map(str.strip, tasks_filter))]
excluded_path_tasks = [tasks.split(".") for tasks in list(map(str.strip, excluded_tasks))]

return _filter_flow_internal(tasks, tasks_path_filter, excluded_path_tasks, [])
return _filter_flow_internal(tasks, tasks_path_filter, excluded_path_tasks, [], logger)


def _filter_flow_internal(tasks: dict, tasks_filter: List[List[str]], excluded_tasks: List[List[str]],
_task_path: List[str]) -> Tuple[dict, List[str]]:
_task_path: List[str],
logger: log.EnhancedLogger = None) -> Tuple[dict, List[str]]:
filtered = {}
final_list = []

Expand Down Expand Up @@ -271,8 +272,8 @@ def _filter_flow_internal(tasks: dict, tasks_filter: List[List[str]], excluded_t
if filtered_flow:
filtered[task_name] = filtered_flow
final_list += _final_list
else:
print("\t%s" % __task_name)
elif logger:
logger.debug("\t%s" % __task_name)

return filtered, final_list

Expand Down
1 change: 1 addition & 0 deletions kubemarine/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def get_final_inventory(c: object, initial_inventory: dict = None) -> dict:
inventory_finalize_functions = {
add_node.add_node_finalize_inventory,
remove_node.remove_node_finalize_inventory,
kubernetes.restore_finalize_inventory,
kubernetes.upgrade_finalize_inventory,
thirdparties.upgrade_finalize_inventory,
plugins.upgrade_finalize_inventory,
Expand Down
29 changes: 29 additions & 0 deletions kubemarine/kubernetes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,35 @@ def generic_upgrade_inventory(cluster: KubernetesCluster, inventory: dict) -> di
return inventory


def enrich_restore_inventory(inventory: dict, cluster: KubernetesCluster) -> dict:
if cluster.context.get("initial_procedure") != "restore":
return inventory

logger = cluster.log
kubernetes_descriptor = cluster.context['backup_descriptor'].setdefault('kubernetes', {})
initial_kubernetes_version = get_initial_kubernetes_version(inventory)
backup_kubernetes_version = kubernetes_descriptor.get('version')
if not backup_kubernetes_version:
logger.debug("Not possible to verify Kubernetes version, as descriptor does not contain 'kubernetes.version'")
backup_kubernetes_version = initial_kubernetes_version

if backup_kubernetes_version != initial_kubernetes_version:
logger.warning('Installed kubernetes version does not match version from backup')
verify_allowed_version(backup_kubernetes_version)

kubernetes_descriptor['version'] = backup_kubernetes_version
return restore_finalize_inventory(cluster, inventory)


def restore_finalize_inventory(cluster: KubernetesCluster, inventory: dict) -> dict:
if cluster.context.get("initial_procedure") != "restore":
return inventory

target_kubernetes_version = cluster.context['backup_descriptor']['kubernetes']['version']
inventory.setdefault("services", {}).setdefault("kubeadm", {})['kubernetesVersion'] = target_kubernetes_version
return inventory


def enrich_inventory(inventory: dict, _: KubernetesCluster) -> dict:
kubeadm = inventory['services']['kubeadm']
kubeadm['dns'].setdefault('imageRepository', f"{kubeadm['imageRepository']}/coredns")
Expand Down
40 changes: 22 additions & 18 deletions kubemarine/procedures/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,14 @@ def get_default_backup_files_list(cluster: KubernetesCluster) -> List[str]:
return backup_files_list


def prepare_backup_tmpdir(cluster: KubernetesCluster) -> str:
backup_directory = cluster.context.get('backup_tmpdir')
def prepare_backup_tmpdir(logger: log.EnhancedLogger, context: dict) -> str:
backup_directory = context.get('backup_tmpdir')
if not backup_directory:
cluster.log.verbose('Backup directory is not ready yet, preparing..')
backup_directory = cluster.context['backup_tmpdir'] = utils.get_dump_filepath(cluster.context, 'backup')
logger.verbose('Backup directory is not ready yet, preparing..')
backup_directory = context['backup_tmpdir'] = utils.get_dump_filepath(context, 'backup')
shutil.rmtree(backup_directory, ignore_errors=True)
os.mkdir(backup_directory)
cluster.log.verbose('Backup directory prepared')
logger.verbose('Backup directory prepared')
return backup_directory


Expand All @@ -90,7 +90,7 @@ def verify_backup_location(cluster: KubernetesCluster) -> None:


def export_ansible_inventory(cluster: KubernetesCluster) -> None:
backup_directory = prepare_backup_tmpdir(cluster)
backup_directory = prepare_backup_tmpdir(cluster.log, cluster.context)
shutil.copyfile(cluster.context['execution_arguments']['ansible_inventory_location'],
os.path.join(backup_directory, 'ansible-inventory.ini'))
cluster.log.verbose('ansible-inventory.ini exported to backup')
Expand All @@ -116,7 +116,7 @@ def export_hostname(cluster: KubernetesCluster) -> None:


def export_cluster_yaml(cluster: KubernetesCluster) -> None:
backup_directory = prepare_backup_tmpdir(cluster)
backup_directory = prepare_backup_tmpdir(cluster.log, cluster.context)
shutil.copyfile(utils.get_dump_filepath(cluster.context, 'cluster.yaml'),
os.path.join(backup_directory, 'cluster.yaml'))
shutil.copyfile(utils.get_external_resource_path(cluster.context['execution_arguments']['config']),
Expand All @@ -125,7 +125,7 @@ def export_cluster_yaml(cluster: KubernetesCluster) -> None:


def export_nodes(cluster: KubernetesCluster) -> None:
backup_directory = prepare_backup_tmpdir(cluster)
backup_directory = prepare_backup_tmpdir(cluster.log, cluster.context)
backup_nodes_data_dir = os.path.join(backup_directory, 'nodes_data')
os.mkdir(backup_nodes_data_dir)

Expand Down Expand Up @@ -158,7 +158,7 @@ def export_nodes(cluster: KubernetesCluster) -> None:


def export_etcd(cluster: KubernetesCluster) -> None:
backup_directory = prepare_backup_tmpdir(cluster)
backup_directory = prepare_backup_tmpdir(cluster.log, cluster.context)
etcd_node, is_custom_etcd_node = select_etcd_node(cluster)
cluster.context['backup_descriptor']['etcd']['image'] = retrieve_etcd_image(cluster, etcd_node)

Expand Down Expand Up @@ -566,7 +566,7 @@ def _download(self, task: DownloaderPayload, temp_local_filepath: str) -> None:


def export_kubernetes(cluster: KubernetesCluster) -> None:
backup_directory = prepare_backup_tmpdir(cluster)
backup_directory = prepare_backup_tmpdir(cluster.log, cluster.context)
control_plane = cluster.nodes['control-plane'].get_any_member()
backup_kubernetes = cluster.procedure_inventory.get('backup_plan', {}).get('kubernetes', {})
logger = cluster.log
Expand Down Expand Up @@ -655,7 +655,7 @@ def _graceful_shutdown_downloaders() -> Optional[BaseException]:


def make_descriptor(cluster: KubernetesCluster) -> None:
backup_directory = prepare_backup_tmpdir(cluster)
backup_directory = prepare_backup_tmpdir(cluster.log, cluster.context)

cluster.context['backup_descriptor']['kubernetes']['thirdparties'] = cluster.inventory['services']['thirdparties']
cluster.context['backup_descriptor']['meta']['time']['finished'] = datetime.datetime.now()
Expand All @@ -666,7 +666,7 @@ def make_descriptor(cluster: KubernetesCluster) -> None:

def pack_data(cluster: KubernetesCluster) -> None:
cluster_name = cluster.inventory['cluster_name']
backup_directory = prepare_backup_tmpdir(cluster)
backup_directory = prepare_backup_tmpdir(cluster.log, cluster.context)

backup_filename = 'backup-%s-%s.tar.gz' % (cluster_name, utils.get_current_timestamp_formatted())

Expand All @@ -675,17 +675,21 @@ def pack_data(cluster: KubernetesCluster) -> None:
target = os.path.join(target, backup_filename)

cluster.log.debug('Packing all data...')
with tarfile.open(target, "w:gz") as tar_handle:
for root, dirs, files in os.walk(backup_directory):
for file in files:
pathname = os.path.join(root, file)
tar_handle.add(pathname, pathname.replace(backup_directory, ''))
tar_handle.close()
pack_to_tgz(target, backup_directory)

cluster.log.verbose('Cleaning up...')
shutil.rmtree(backup_directory, ignore_errors=True)


def pack_to_tgz(target_archive: str, source_dir: str) -> None:
with tarfile.open(target_archive, "w:gz") as tar_handle:
for root, dirs, files in os.walk(source_dir):
for file in files:
pathname = os.path.join(root, file)
tar_handle.add(pathname, pathname.replace(source_dir, ''))
tar_handle.close()


tasks = OrderedDict({
"verify_backup_location": verify_backup_location,
"export": {
Expand Down
47 changes: 18 additions & 29 deletions kubemarine/procedures/restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import yaml

from kubemarine.core import utils, flow, defaults
from kubemarine.core import utils, flow
from kubemarine.core.action import Action
from kubemarine.core.cluster import KubernetesCluster
from kubemarine.core.resources import DynamicResources
Expand Down Expand Up @@ -58,9 +58,11 @@ def replace_config_from_backup_if_needed(procedure_inventory_filepath: str, conf
tar.close()


def unpack_data(cluster: KubernetesCluster) -> None:
backup_tmp_directory = backup.prepare_backup_tmpdir(cluster)
backup_file_source = cluster.procedure_inventory.get('backup_location')
def unpack_data(resources: DynamicResources) -> None:
logger = resources.logger()
context = resources.context
backup_tmp_directory = backup.prepare_backup_tmpdir(logger, context)
backup_file_source = resources.procedure_inventory().get('backup_location')

if not backup_file_source:
raise Exception('Backup source not specified in procedure')
Expand All @@ -69,13 +71,13 @@ def unpack_data(cluster: KubernetesCluster) -> None:
if not os.path.isfile(backup_file_source):
raise FileNotFoundError('Backup file "%s" not found' % backup_file_source)

cluster.log.debug('Unpacking all data...')
logger.debug('Unpacking all data...')
with tarfile.open(backup_file_source, 'r:gz') as tar:
for member in tar:
if member.isdir():
continue
fname = os.path.join(backup_tmp_directory, member.name)
cluster.log.debug(fname)
logger.debug(fname)
fname_parts = fname.split('/')
if len(fname_parts) > 1:
fname_dir = "/".join(fname_parts[:-1])
Expand All @@ -89,25 +91,7 @@ def unpack_data(cluster: KubernetesCluster) -> None:
raise FileNotFoundError('Descriptor not found in backup file')

with utils.open_external(descriptor_filepath, 'r') as stream:
cluster.context['backup_descriptor'] = yaml.safe_load(stream)


def verify_backup_data(cluster: KubernetesCluster) -> None:
if not cluster.context['backup_descriptor'].get('kubernetes', {}).get('version'):
cluster.log.debug('Not possible to verify Kubernetes version, because descriptor do not contain such information')
return

if cluster.context['backup_descriptor']['kubernetes']['version'] != cluster.inventory['services']['kubeadm']['kubernetesVersion']:
cluster.log.warning('Installed kubernetes versions do not match version from backup')
cluster.log.verbose('Cluster re-parse required')
if not cluster.raw_inventory.get('services'):
cluster.raw_inventory['services'] = {}
if not cluster.raw_inventory['services'].get('kubeadm'):
cluster.raw_inventory['services']['kubeadm'] = {}
cluster.raw_inventory['services']['kubeadm']['kubernetesVersion'] = cluster.context['backup_descriptor']['kubernetes']['version']
cluster._inventory = defaults.enrich_inventory(cluster, cluster.raw_inventory)
else:
cluster.log.debug('Kubernetes version from backup is correct')
context['backup_descriptor'] = yaml.safe_load(stream)


def stop_cluster(cluster: KubernetesCluster) -> None:
Expand Down Expand Up @@ -278,8 +262,6 @@ def reboot(cluster: KubernetesCluster) -> None:

tasks = OrderedDict({
"prepare": {
"unpack": unpack_data,
"verify_backup_data": verify_backup_data,
"stop_cluster": stop_cluster,
},
"restore": {
Expand All @@ -293,12 +275,19 @@ def reboot(cluster: KubernetesCluster) -> None:
})


class RestoreFlow(flow.Flow):
def _run(self, resources: DynamicResources) -> None:
unpack_data(resources)
flow.run_actions(resources, [RestoreAction()])


class RestoreAction(Action):
def __init__(self) -> None:
super().__init__('restore')
super().__init__('restore', recreate_inventory=True)

def run(self, res: DynamicResources) -> None:
flow.run_tasks(res, tasks)
res.make_final_inventory()


def main(cli_arguments: List[str] = None) -> None:
Expand All @@ -316,7 +305,7 @@ def main(cli_arguments: List[str] = None) -> None:

replace_config_from_backup_if_needed(args['procedure_config'], args['config'])

flow.ActionsFlow([RestoreAction()]).run_flow(context)
RestoreFlow().run_flow(context)


if __name__ == '__main__':
Expand Down
2 changes: 2 additions & 0 deletions test/unit/core/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ def test_procedure_examples_valid(self):
self.fail(f"Unknown procedure for inventory {relpath}")

context = demo.create_silent_context(procedure=procedure)
if procedure == 'restore':
context['backup_descriptor'] = {}
inventory = demo.generate_inventory(**demo.MINIHA)

# check that enrichment is successful and the inventory is valid against the schema
Expand Down
Loading

0 comments on commit 9fbf63a

Please sign in to comment.