From 64f7133ed0e2eada5d1119f58affd1dd07283251 Mon Sep 17 00:00:00 2001 From: Alex Kenion Date: Tue, 24 Oct 2023 16:12:53 -0400 Subject: [PATCH] Implemented automatic conversion process for legacy config --- wordfence/cli/configurer.py | 116 +++++++++++++++++++++++++++++------- 1 file changed, 93 insertions(+), 23 deletions(-) diff --git a/wordfence/cli/configurer.py b/wordfence/cli/configurer.py index e72993f3..9ffe7c36 100644 --- a/wordfence/cli/configurer.py +++ b/wordfence/cli/configurer.py @@ -1,7 +1,7 @@ from collections import namedtuple from configparser import ConfigParser, DuplicateSectionError from multiprocessing import cpu_count -from typing import Optional, List, Dict +from typing import Optional, List, Dict, TextIO from wordfence.util.input import prompt, prompt_yes_no, prompt_int, \ InvalidInputException @@ -17,12 +17,19 @@ CONFIG_SECTION_DEFAULT = 'DEFAULT' +LEGACY_CONFIG_SECTION = 'SCAN' +LEGACY_CONFIG_KEYS = { + 'license', + 'cache_directory', + 'workers' + } +LEGACY_CONVERSION_SECTION = 'MALWARE_SCAN' -ConfigUpdate = namedtuple('ConfigUpdate', ['section', 'key', 'value']) +ConfigValue = namedtuple('ConfigValue', ['section', 'key', 'value']) -class ConfigWriter: +class ConfigFileManager: def __init__( self, @@ -30,9 +37,11 @@ def __init__( ): self.config = config self.parser = None + self._read = False def initialize_parser(self) -> None: self.parser = ConfigParser() + self._read = False def require_parser(self) -> None: if self.parser is None: @@ -49,28 +58,36 @@ def require_section(self, section: str) -> None: except DuplicateSectionError: pass - def apply_update(self, update: ConfigUpdate) -> None: + def apply_update(self, update: ConfigValue) -> None: self.require_parser() self.require_section(update.section) self.parser.set(update.section, update.key, update.value) - def write(self, updates: List[ConfigUpdate]) -> None: - # TODO: What if the INI file changes after the config is loaded? - self.initialize_parser() + def resolve_ini_path(self) -> str: ini_path = self.config.ini_path if self.config.has_ini_file() \ else self.config.configuration - ini_path = resolve_path(ini_path) + return resolve_path(ini_path) + + def read_existing_config(self, file: TextIO, ini_path: str) -> None: + try: + if not self._read: + self.parser.read_file(file) + except BaseException: + log.warning( + 'Failed to read existing config file at ' + f'{ini_path}. existing data will be truncated.' + ) + self._read = True + + def write(self, updates: List[ConfigValue]) -> None: + # TODO: What if the INI file changes after the config is loaded? + self.require_parser() + ini_path = self.resolve_ini_path() ensure_file_is_writable(ini_path) open_mode = 'r' if self.config.has_ini_file() else 'w' with open(ini_path, open_mode + '+') as file: if self.config.has_ini_file(): - try: - self.parser.read_file(file) - except BaseException: - log.warning( - 'Failed to read existing config file at ' - f'{ini_path}. existing data will be truncated.' - ) + self.read_existing_config(file, ini_path) for update in updates: self.apply_update(update) @@ -82,6 +99,21 @@ def write(self, updates: List[ConfigUpdate]) -> None: self.written = True log.info(f'Config saved to {ini_path}') + def read(self) -> List[ConfigValue]: + values = [] + self.initialize_parser() + ini_path = self.resolve_ini_path() + with open(ini_path, 'r') as file: + self.read_existing_config(file, ini_path) + for section_name, section_proxy in self.parser.items(): + for key, value in section_proxy.items(): + values.append(ConfigValue(section_name, key, value)) + return values + + def delete_section(self, section: str) -> None: + self.require_parser() + self.parser.remove_section(section) + class Configurer: @@ -95,11 +127,17 @@ def __init__( self.config = config self.all_config = {} self.all_config[config.subcommand] = config - self.config_updates = [] + self.config_values = [] self.terms_manager = terms_manager self.subcommand_definition = subcommand_definition self.subcommand_definitions = subcommand_definitions self.written = False + self.config_file_manager = None + + def get_config_file_manager(self) -> ConfigFileManager: + if self.config_file_manager is None: + self.config_file_manager = ConfigFileManager(self.config) + return self.config_file_manager def get_config(self, subcommand: str): if subcommand not in self.all_config: @@ -238,9 +276,13 @@ def _prompt_for_worker_count(self) -> int: ) return processes + def read_config(self) -> List[ConfigValue]: + manager = self.get_config_file_manager() + return manager.read() + def write_config(self) -> None: - writer = ConfigWriter(self.config) - writer.write(self.config_updates) + manager = self.get_config_file_manager() + manager.write(self.config_values) def update_config( self, @@ -248,14 +290,14 @@ def update_config( value: str, section: str = 'DEFAULT' ) -> None: - self.config_updates.append( - ConfigUpdate(section, key, str(value)) + self.config_values.append( + ConfigValue(section, key, str(value)) ) if self.supports_option(key): setattr(self.config, key, value) - def prompt_for_config(self) -> bool: - if not self._prompt_overwrite(): + def prompt_for_config(self, overwrite: bool = False) -> bool: + if not overwrite and not self._prompt_overwrite(): return False has_existing_config = self.config.has_ini_file() self.update_config( @@ -284,6 +326,33 @@ def prompt_for_config(self) -> bool: ) return True + def convert_legacy_config(self) -> bool: + values = self.read_config() + has_legacy_config = False + for value in values: + if not value.section == LEGACY_CONFIG_SECTION: + continue + if value.key in LEGACY_CONFIG_KEYS: + setattr(self.config, value.key, value.value) + has_legacy_config = True + else: + self.update_config( + value.key, + value.value, + LEGACY_CONVERSION_SECTION + ) + if not has_legacy_config: + return False + should_convert = prompt_yes_no( + 'A configuration file for an older version of Wordfence CLI ' + 'was detected; would you like to update it now?', + default=True + ) + if should_convert: + self.config_file_manager.delete_section(LEGACY_CONFIG_SECTION) + self.prompt_for_config(overwrite=True) + return True + def prompt_for_missing_config(self) -> bool: should_configure = prompt_yes_no( 'Wordfence CLI cannot be used until it has been configured. ' @@ -300,5 +369,6 @@ def check_config(self) -> bool: if self.has_base_config(): return True else: - self.prompt_for_missing_config() + if not self.convert_legacy_config(): + self.prompt_for_missing_config() return False