Skip to content

Commit

Permalink
Implemented automatic conversion process for legacy config
Browse files Browse the repository at this point in the history
  • Loading branch information
akenion committed Oct 24, 2023
1 parent 9958ea0 commit 64f7133
Showing 1 changed file with 93 additions and 23 deletions.
116 changes: 93 additions & 23 deletions wordfence/cli/configurer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from collections import namedtuple
from configparser import ConfigParser, DuplicateSectionError
from multiprocessing import cpu_count
from typing import Optional, List, Dict
from typing import Optional, List, Dict, TextIO

from wordfence.util.input import prompt, prompt_yes_no, prompt_int, \
InvalidInputException
Expand All @@ -17,22 +17,31 @@


CONFIG_SECTION_DEFAULT = 'DEFAULT'
LEGACY_CONFIG_SECTION = 'SCAN'
LEGACY_CONFIG_KEYS = {
'license',
'cache_directory',
'workers'
}
LEGACY_CONVERSION_SECTION = 'MALWARE_SCAN'


ConfigUpdate = namedtuple('ConfigUpdate', ['section', 'key', 'value'])
ConfigValue = namedtuple('ConfigValue', ['section', 'key', 'value'])


class ConfigWriter:
class ConfigFileManager:

def __init__(
self,
config
):
self.config = config
self.parser = None
self._read = False

def initialize_parser(self) -> None:
self.parser = ConfigParser()
self._read = False

def require_parser(self) -> None:
if self.parser is None:
Expand All @@ -49,28 +58,36 @@ def require_section(self, section: str) -> None:
except DuplicateSectionError:
pass

def apply_update(self, update: ConfigUpdate) -> None:
def apply_update(self, update: ConfigValue) -> None:
self.require_parser()
self.require_section(update.section)
self.parser.set(update.section, update.key, update.value)

def write(self, updates: List[ConfigUpdate]) -> None:
# TODO: What if the INI file changes after the config is loaded?
self.initialize_parser()
def resolve_ini_path(self) -> str:
ini_path = self.config.ini_path if self.config.has_ini_file() \
else self.config.configuration
ini_path = resolve_path(ini_path)
return resolve_path(ini_path)

def read_existing_config(self, file: TextIO, ini_path: str) -> None:
try:
if not self._read:
self.parser.read_file(file)
except BaseException:
log.warning(
'Failed to read existing config file at '
f'{ini_path}. existing data will be truncated.'
)
self._read = True

def write(self, updates: List[ConfigValue]) -> None:
# TODO: What if the INI file changes after the config is loaded?
self.require_parser()
ini_path = self.resolve_ini_path()
ensure_file_is_writable(ini_path)
open_mode = 'r' if self.config.has_ini_file() else 'w'
with open(ini_path, open_mode + '+') as file:
if self.config.has_ini_file():
try:
self.parser.read_file(file)
except BaseException:
log.warning(
'Failed to read existing config file at '
f'{ini_path}. existing data will be truncated.'
)
self.read_existing_config(file, ini_path)

for update in updates:
self.apply_update(update)
Expand All @@ -82,6 +99,21 @@ def write(self, updates: List[ConfigUpdate]) -> None:
self.written = True
log.info(f'Config saved to {ini_path}')

def read(self) -> List[ConfigValue]:
values = []
self.initialize_parser()
ini_path = self.resolve_ini_path()
with open(ini_path, 'r') as file:
self.read_existing_config(file, ini_path)
for section_name, section_proxy in self.parser.items():
for key, value in section_proxy.items():
values.append(ConfigValue(section_name, key, value))
return values

def delete_section(self, section: str) -> None:
self.require_parser()
self.parser.remove_section(section)


class Configurer:

Expand All @@ -95,11 +127,17 @@ def __init__(
self.config = config
self.all_config = {}
self.all_config[config.subcommand] = config
self.config_updates = []
self.config_values = []
self.terms_manager = terms_manager
self.subcommand_definition = subcommand_definition
self.subcommand_definitions = subcommand_definitions
self.written = False
self.config_file_manager = None

def get_config_file_manager(self) -> ConfigFileManager:
if self.config_file_manager is None:
self.config_file_manager = ConfigFileManager(self.config)
return self.config_file_manager

def get_config(self, subcommand: str):
if subcommand not in self.all_config:
Expand Down Expand Up @@ -238,24 +276,28 @@ def _prompt_for_worker_count(self) -> int:
)
return processes

def read_config(self) -> List[ConfigValue]:
manager = self.get_config_file_manager()
return manager.read()

def write_config(self) -> None:
writer = ConfigWriter(self.config)
writer.write(self.config_updates)
manager = self.get_config_file_manager()
manager.write(self.config_values)

def update_config(
self,
key: str,
value: str,
section: str = 'DEFAULT'
) -> None:
self.config_updates.append(
ConfigUpdate(section, key, str(value))
self.config_values.append(
ConfigValue(section, key, str(value))
)
if self.supports_option(key):
setattr(self.config, key, value)

def prompt_for_config(self) -> bool:
if not self._prompt_overwrite():
def prompt_for_config(self, overwrite: bool = False) -> bool:
if not overwrite and not self._prompt_overwrite():
return False
has_existing_config = self.config.has_ini_file()
self.update_config(
Expand Down Expand Up @@ -284,6 +326,33 @@ def prompt_for_config(self) -> bool:
)
return True

def convert_legacy_config(self) -> bool:
values = self.read_config()
has_legacy_config = False
for value in values:
if not value.section == LEGACY_CONFIG_SECTION:
continue
if value.key in LEGACY_CONFIG_KEYS:
setattr(self.config, value.key, value.value)
has_legacy_config = True
else:
self.update_config(
value.key,
value.value,
LEGACY_CONVERSION_SECTION
)
if not has_legacy_config:
return False
should_convert = prompt_yes_no(
'A configuration file for an older version of Wordfence CLI '
'was detected; would you like to update it now?',
default=True
)
if should_convert:
self.config_file_manager.delete_section(LEGACY_CONFIG_SECTION)
self.prompt_for_config(overwrite=True)
return True

def prompt_for_missing_config(self) -> bool:
should_configure = prompt_yes_no(
'Wordfence CLI cannot be used until it has been configured. '
Expand All @@ -300,5 +369,6 @@ def check_config(self) -> bool:
if self.has_base_config():
return True
else:
self.prompt_for_missing_config()
if not self.convert_legacy_config():
self.prompt_for_missing_config()
return False

0 comments on commit 64f7133

Please sign in to comment.