Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented automatic conversion process for legacy config #84

Merged
merged 1 commit into from
Oct 24, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 93 additions & 23 deletions wordfence/cli/configurer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from collections import namedtuple
from configparser import ConfigParser, DuplicateSectionError
from multiprocessing import cpu_count
from typing import Optional, List, Dict
from typing import Optional, List, Dict, TextIO

from wordfence.util.input import prompt, prompt_yes_no, prompt_int, \
InvalidInputException
Expand All @@ -17,22 +17,31 @@


CONFIG_SECTION_DEFAULT = 'DEFAULT'
LEGACY_CONFIG_SECTION = 'SCAN'
LEGACY_CONFIG_KEYS = {
'license',
'cache_directory',
'workers'
}
LEGACY_CONVERSION_SECTION = 'MALWARE_SCAN'


ConfigUpdate = namedtuple('ConfigUpdate', ['section', 'key', 'value'])
ConfigValue = namedtuple('ConfigValue', ['section', 'key', 'value'])


class ConfigWriter:
class ConfigFileManager:

def __init__(
self,
config
):
self.config = config
self.parser = None
self._read = False

def initialize_parser(self) -> None:
self.parser = ConfigParser()
self._read = False

def require_parser(self) -> None:
if self.parser is None:
Expand All @@ -49,28 +58,36 @@ def require_section(self, section: str) -> None:
except DuplicateSectionError:
pass

def apply_update(self, update: ConfigUpdate) -> None:
def apply_update(self, update: ConfigValue) -> None:
self.require_parser()
self.require_section(update.section)
self.parser.set(update.section, update.key, update.value)

def write(self, updates: List[ConfigUpdate]) -> None:
# TODO: What if the INI file changes after the config is loaded?
self.initialize_parser()
def resolve_ini_path(self) -> str:
ini_path = self.config.ini_path if self.config.has_ini_file() \
else self.config.configuration
ini_path = resolve_path(ini_path)
return resolve_path(ini_path)

def read_existing_config(self, file: TextIO, ini_path: str) -> None:
try:
if not self._read:
self.parser.read_file(file)
except BaseException:
log.warning(
'Failed to read existing config file at '
f'{ini_path}. existing data will be truncated.'
)
self._read = True

def write(self, updates: List[ConfigValue]) -> None:
# TODO: What if the INI file changes after the config is loaded?
self.require_parser()
ini_path = self.resolve_ini_path()
ensure_file_is_writable(ini_path)
open_mode = 'r' if self.config.has_ini_file() else 'w'
with open(ini_path, open_mode + '+') as file:
if self.config.has_ini_file():
try:
self.parser.read_file(file)
except BaseException:
log.warning(
'Failed to read existing config file at '
f'{ini_path}. existing data will be truncated.'
)
self.read_existing_config(file, ini_path)

for update in updates:
self.apply_update(update)
Expand All @@ -82,6 +99,21 @@ def write(self, updates: List[ConfigUpdate]) -> None:
self.written = True
log.info(f'Config saved to {ini_path}')

def read(self) -> List[ConfigValue]:
values = []
self.initialize_parser()
ini_path = self.resolve_ini_path()
with open(ini_path, 'r') as file:
self.read_existing_config(file, ini_path)
for section_name, section_proxy in self.parser.items():
for key, value in section_proxy.items():
values.append(ConfigValue(section_name, key, value))
return values

def delete_section(self, section: str) -> None:
self.require_parser()
self.parser.remove_section(section)


class Configurer:

Expand All @@ -95,11 +127,17 @@ def __init__(
self.config = config
self.all_config = {}
self.all_config[config.subcommand] = config
self.config_updates = []
self.config_values = []
self.terms_manager = terms_manager
self.subcommand_definition = subcommand_definition
self.subcommand_definitions = subcommand_definitions
self.written = False
self.config_file_manager = None

def get_config_file_manager(self) -> ConfigFileManager:
if self.config_file_manager is None:
self.config_file_manager = ConfigFileManager(self.config)
return self.config_file_manager

def get_config(self, subcommand: str):
if subcommand not in self.all_config:
Expand Down Expand Up @@ -238,24 +276,28 @@ def _prompt_for_worker_count(self) -> int:
)
return processes

def read_config(self) -> List[ConfigValue]:
manager = self.get_config_file_manager()
return manager.read()

def write_config(self) -> None:
writer = ConfigWriter(self.config)
writer.write(self.config_updates)
manager = self.get_config_file_manager()
manager.write(self.config_values)

def update_config(
self,
key: str,
value: str,
section: str = 'DEFAULT'
) -> None:
self.config_updates.append(
ConfigUpdate(section, key, str(value))
self.config_values.append(
ConfigValue(section, key, str(value))
)
if self.supports_option(key):
setattr(self.config, key, value)

def prompt_for_config(self) -> bool:
if not self._prompt_overwrite():
def prompt_for_config(self, overwrite: bool = False) -> bool:
if not overwrite and not self._prompt_overwrite():
return False
has_existing_config = self.config.has_ini_file()
self.update_config(
Expand Down Expand Up @@ -284,6 +326,33 @@ def prompt_for_config(self) -> bool:
)
return True

def convert_legacy_config(self) -> bool:
values = self.read_config()
has_legacy_config = False
for value in values:
if not value.section == LEGACY_CONFIG_SECTION:
continue
if value.key in LEGACY_CONFIG_KEYS:
setattr(self.config, value.key, value.value)
has_legacy_config = True
else:
self.update_config(
value.key,
value.value,
LEGACY_CONVERSION_SECTION
)
if not has_legacy_config:
return False
should_convert = prompt_yes_no(
'A configuration file for an older version of Wordfence CLI '
'was detected; would you like to update it now?',
default=True
)
if should_convert:
self.config_file_manager.delete_section(LEGACY_CONFIG_SECTION)
self.prompt_for_config(overwrite=True)
return True

def prompt_for_missing_config(self) -> bool:
should_configure = prompt_yes_no(
'Wordfence CLI cannot be used until it has been configured. '
Expand All @@ -300,5 +369,6 @@ def check_config(self) -> bool:
if self.has_base_config():
return True
else:
self.prompt_for_missing_config()
if not self.convert_legacy_config():
self.prompt_for_missing_config()
return False