diff --git a/slither/tools/mutator/__main__.py b/slither/tools/mutator/__main__.py index 5e5efba00..15a695a9e 100644 --- a/slither/tools/mutator/__main__.py +++ b/slither/tools/mutator/__main__.py @@ -5,6 +5,7 @@ import shutil import sys import time +from pathlib import Path from typing import Type, List, Any, Optional from crytic_compile import cryticparser from slither import Slither @@ -147,7 +148,7 @@ def __call__( ################################################################################### -def main() -> (None): # pylint: disable=too-many-statements,too-many-branches,too-many-locals +def main() -> None: # pylint: disable=too-many-statements,too-many-branches,too-many-locals args = parse_args() # arguments @@ -160,7 +161,6 @@ def main() -> (None): # pylint: disable=too-many-statements,too-many-branches,t verbose: Optional[bool] = args.verbose very_verbose: Optional[bool] = args.very_verbose mutators_to_run: Optional[List[str]] = args.mutators_to_run - contract_names: Optional[List[str]] = args.contract_names comprehensive_flag: Optional[bool] = args.comprehensive logger.info(blue(f"Starting mutation campaign in {args.codebase}")) @@ -171,14 +171,19 @@ def main() -> (None): # pylint: disable=too-many-statements,too-many-branches,t else: paths_to_ignore_list = [] + contract_names: List[str] = [] + if args.contract_names: + contract_names = args.contract_names.split(",") + # get all the contracts as a list from given codebase - sol_file_list: List[str] = get_sol_file_list(args.codebase, paths_to_ignore_list) + sol_file_list: List[str] = get_sol_file_list(Path(args.codebase), paths_to_ignore_list) # folder where backup files and uncaught mutants are saved if output_dir is None: - output_dir = "/mutation_campaign" - output_folder = os.getcwd() + output_dir - if os.path.exists(output_folder): + output_dir = "./mutation_campaign" + + output_folder = Path(output_dir).resolve() + if output_folder.is_dir(): shutil.rmtree(output_folder) # setting RR mutator as first mutator @@ -226,6 +231,9 @@ def main() -> (None): # pylint: disable=too-many-statements,too-many-branches,t ) ) + # Keep a list of all already mutated contracts so we don't mutate them twice + mutated_contracts: List[str] = [] + for filename in sol_file_list: # pylint: disable=too-many-nested-blocks file_name = os.path.split(filename)[1].split(".sol")[0] # slither object @@ -240,16 +248,16 @@ def main() -> (None): # pylint: disable=too-many-statements,too-many-branches,t dont_mutate_lines = [] # mutation - target_contract = "" + target_contract = "SLITHER_SKIP_MUTATIONS" if contract_names else "" try: for compilation_unit_of_main_file in sl.compilation_units: for contract in compilation_unit_of_main_file.contracts: - if contract_names is not None and contract.name in contract_names: + if contract.name in contract_names and contract.name not in mutated_contracts: target_contract = contract - elif contract_names is not None and contract.name not in contract_names: - target_contract = "SLITHER_SKIP_MUTATIONS" - elif contract.name.lower() == file_name.lower(): + break + elif not contract_names and contract.name.lower() == file_name.lower(): target_contract = contract + break if target_contract == "": logger.info( @@ -267,6 +275,8 @@ def main() -> (None): # pylint: disable=too-many-statements,too-many-branches,t logger.debug(f"Skipping mutations on interface {filename}") continue + # Add our target to the mutation list + mutated_contracts.append(target_contract.name) logger.info(blue(f"Mutating contract {target_contract}")) for M in mutators_list: m = M( @@ -322,6 +332,7 @@ def main() -> (None): # pylint: disable=too-many-statements,too-many-branches,t except Exception as e: # pylint: disable=broad-except logger.error(e) + transfer_and_delete(files_dict) except KeyboardInterrupt: # transfer and delete the backup files if interrupted diff --git a/slither/tools/mutator/mutators/abstract_mutator.py b/slither/tools/mutator/mutators/abstract_mutator.py index 2d1e68107..69c77a4ca 100644 --- a/slither/tools/mutator/mutators/abstract_mutator.py +++ b/slither/tools/mutator/mutators/abstract_mutator.py @@ -1,5 +1,6 @@ import abc import logging +from pathlib import Path from typing import Optional, Dict, Tuple, List from slither.core.compilation_unit import SlitherCompilationUnit from slither.formatters.utils.patches import apply_patch, create_diff @@ -29,7 +30,7 @@ def __init__( # pylint: disable=too-many-arguments solc_remappings: str | None, verbose: bool, very_verbose: bool, - output_folder: str, + output_folder: Path, dont_mutate_line: List[int], rate: int = 10, seed: Optional[int] = None, @@ -89,6 +90,7 @@ def mutate(self) -> Tuple[List[int], List[int], List[int]]: for patch in patches: # test the patch patchWasCaught = test_patch( + self.output_folder, file, patch, self.test_command, @@ -116,8 +118,8 @@ def mutate(self) -> Tuple[List[int], List[int], List[int]]: logger.info(f"Impossible to generate patch; empty {patches}") # add uncaught mutant patches to a output file - with open( - self.output_folder + "/patches_file.txt", "a", encoding="utf8" + with (self.output_folder / "patches_files.txt").open( + "a", encoding="utf8" ) as patches_file: patches_file.write(diff + "\n") diff --git a/slither/tools/mutator/utils/file_handling.py b/slither/tools/mutator/utils/file_handling.py index 8c435302c..15f2b9506 100644 --- a/slither/tools/mutator/utils/file_handling.py +++ b/slither/tools/mutator/utils/file_handling.py @@ -1,89 +1,76 @@ -import os import traceback -from typing import Dict, List +from typing import Dict, List, Union import logging +from pathlib import Path +import hashlib logger = logging.getLogger("Slither-Mutate") -duplicated_files = {} +HashedPath = str +backuped_files: Dict[str, HashedPath] = {} -def backup_source_file(source_code: Dict, output_folder: str) -> Dict: +def backup_source_file(source_code: Dict, output_folder: Path) -> Dict[str, HashedPath]: """ function to backup the source file returns: dictionary of duplicated files """ - os.makedirs(output_folder, exist_ok=True) - + output_folder.mkdir(exist_ok=True, parents=True) for file_path, content in source_code.items(): - directory, filename = os.path.split(file_path) - new_filename = f"{output_folder}/backup_{filename}" - new_file_path = os.path.join(directory, new_filename) + path_hash = hashlib.md5(bytes(file_path, "utf8")).hexdigest() + (output_folder / path_hash).write_text(content, encoding="utf8") - with open(new_file_path, "w", encoding="utf8") as new_file: - new_file.write(content) - duplicated_files[file_path] = new_file_path + backuped_files[file_path] = (output_folder / path_hash).as_posix() - return duplicated_files + return backuped_files -def transfer_and_delete(files_dict: Dict) -> None: +def transfer_and_delete(files_dict: Dict[str, HashedPath]) -> None: """function to transfer the original content to the sol file after campaign""" try: files_dict_copy = files_dict.copy() - for item, value in files_dict_copy.items(): - with open(value, "r", encoding="utf8") as duplicated_file: + for original_path, hashed_path in files_dict_copy.items(): + with open(hashed_path, "r", encoding="utf8") as duplicated_file: content = duplicated_file.read() - with open(item, "w", encoding="utf8") as original_file: + with open(original_path, "w", encoding="utf8") as original_file: original_file.write(content) - os.remove(value) + Path(hashed_path).unlink() # delete elements from the global dict - del duplicated_files[item] + del backuped_files[original_path] - except Exception as e: # pylint: disable=broad-except - logger.error(f"Error transferring content: {e}") + except FileNotFoundError as e: # pylint: disable=broad-except + logger.error(f"Error transferring content: %s", e) global_counter = {} -def create_mutant_file(file: str, rule: str) -> None: +def create_mutant_file(output_folder: Path, file: str, rule: str) -> None: """function to create new mutant file""" try: if rule not in global_counter: global_counter[rule] = 0 - _, filename = os.path.split(file) + + file_path = Path(file) # Read content from the duplicated file - with open(file, "r", encoding="utf8") as source_file: - content = source_file.read() + content = file_path.read_text(encoding="utf8") # Write content to the original file - mutant_name = filename.split(".")[0] - + mutant_name = file_path.stem # create folder for each contract - os.makedirs("mutation_campaign/" + mutant_name, exist_ok=True) - with open( - "mutation_campaign/" - + mutant_name - + "/" - + mutant_name - + "_" - + rule - + "_" - + str(global_counter[rule]) - + ".sol", - "w", - encoding="utf8", - ) as mutant_file: + mutation_dir = output_folder / mutant_name + mutation_dir.mkdir(parents=True, exist_ok=True) + + mutation_filename = f"{mutant_name}_{rule}_{global_counter[rule]}.sol" + with (mutation_dir / mutation_filename).open("w", encoding="utf8") as mutant_file: mutant_file.write(content) global_counter[rule] += 1 # reset the file - with open(duplicated_files[file], "r", encoding="utf8") as duplicated_file: - duplicate_content = duplicated_file.read() + duplicate_content = Path(backuped_files[file]).read_text("utf8") with open(file, "w", encoding="utf8") as source_file: source_file.write(duplicate_content) @@ -97,19 +84,18 @@ def create_mutant_file(file: str, rule: str) -> None: def reset_file(file: str) -> None: """function to reset the file""" try: - # directory, filename = os.path.split(file) # reset the file - with open(duplicated_files[file], "r", encoding="utf8") as duplicated_file: + with open(backuped_files[file], "r", encoding="utf8") as duplicated_file: duplicate_content = duplicated_file.read() with open(file, "w", encoding="utf8") as source_file: source_file.write(duplicate_content) except Exception as e: # pylint: disable=broad-except - logger.error(f"Error resetting file: {e}") + logger.error(f"Error resetting file: %s", e) -def get_sol_file_list(codebase: str, ignore_paths: List[str] | None) -> List[str]: +def get_sol_file_list(codebase: Path, ignore_paths: Union[List[str], None]) -> List[str]: """ function to get the contracts list returns: list of .sol files @@ -119,21 +105,13 @@ def get_sol_file_list(codebase: str, ignore_paths: List[str] | None) -> List[str ignore_paths = [] # if input is contract file - if os.path.isfile(codebase): - return [codebase] + if codebase.is_file(): + return [codebase.as_posix()] # if input is folder - if os.path.isdir(codebase): - directory = os.path.abspath(codebase) - for file in os.listdir(directory): - filename = os.path.join(directory, file) - if os.path.isfile(filename): - sol_file_list.append(filename) - elif os.path.isdir(filename): - _, dirname = os.path.split(filename) - if dirname in ignore_paths: - continue - for i in get_sol_file_list(filename, ignore_paths): - sol_file_list.append(i) + if codebase.is_dir(): + for file_name in codebase.rglob("*.sol"): + if not any(part in ignore_paths for part in file_name.parts): + sol_file_list.append(file_name.as_posix()) return sol_file_list diff --git a/slither/tools/mutator/utils/testing_generated_mutant.py b/slither/tools/mutator/utils/testing_generated_mutant.py index 22ee20e06..a1f54df3b 100644 --- a/slither/tools/mutator/utils/testing_generated_mutant.py +++ b/slither/tools/mutator/utils/testing_generated_mutant.py @@ -1,7 +1,8 @@ import logging import sys import subprocess -from typing import Dict +from pathlib import Path +from typing import Dict, Union import crytic_compile from slither.tools.mutator.utils.file_handling import create_mutant_file, reset_file from slither.utils.colors import green, red, yellow @@ -77,12 +78,13 @@ def run_test_cmd(cmd: str, timeout: int | None, target_file: str | None, verbose # return 0 if uncaught, 1 if caught, and 2 if compilation fails def test_patch( # pylint: disable=too-many-arguments + output_folder: Path, file: str, patch: Dict, command: str, generator_name: str, timeout: int, - mappings: str | None, + mappings: Union[str, None], verbose: bool, very_verbose: bool, ) -> int: @@ -97,15 +99,18 @@ def test_patch( # pylint: disable=too-many-arguments # Write the modified content back to the file with open(file, "w", encoding="utf-8") as filepath: filepath.write(replaced_content) + if compile_generated_mutant(file, mappings): if run_test_cmd(command, timeout, file, False): - create_mutant_file(file, generator_name) + + create_mutant_file(output_folder, file, generator_name) logger.info( red( f"[{generator_name}] Line {patch['line_number']}: '{patch['old_string']}' ==> '{patch['new_string']}' --> UNCAUGHT" ) ) reset_file(file) + return 0 # uncaught else: if very_verbose: