Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

slither-mutate: bugfix when two files have the same name #2357

Merged
merged 9 commits into from
Mar 18, 2024
Merged
33 changes: 22 additions & 11 deletions slither/tools/mutator/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import shutil
import sys
import time
from pathlib import Path
from typing import Type, List, Any, Optional
from crytic_compile import cryticparser
from slither import Slither
Expand Down Expand Up @@ -147,7 +148,7 @@ def __call__(
###################################################################################


def main() -> (None): # pylint: disable=too-many-statements,too-many-branches,too-many-locals
def main() -> None: # pylint: disable=too-many-statements,too-many-branches,too-many-locals
args = parse_args()

# arguments
Expand All @@ -160,7 +161,6 @@ def main() -> (None): # pylint: disable=too-many-statements,too-many-branches,t
verbose: Optional[bool] = args.verbose
very_verbose: Optional[bool] = args.very_verbose
mutators_to_run: Optional[List[str]] = args.mutators_to_run
contract_names: Optional[List[str]] = args.contract_names
comprehensive_flag: Optional[bool] = args.comprehensive

logger.info(blue(f"Starting mutation campaign in {args.codebase}"))
Expand All @@ -171,14 +171,19 @@ def main() -> (None): # pylint: disable=too-many-statements,too-many-branches,t
else:
paths_to_ignore_list = []

contract_names: List[str] = []
if args.contract_names:
contract_names = args.contract_names.split(",")

# get all the contracts as a list from given codebase
sol_file_list: List[str] = get_sol_file_list(args.codebase, paths_to_ignore_list)
sol_file_list: List[str] = get_sol_file_list(Path(args.codebase), paths_to_ignore_list)

# folder where backup files and uncaught mutants are saved
if output_dir is None:
output_dir = "/mutation_campaign"
output_folder = os.getcwd() + output_dir
if os.path.exists(output_folder):
output_dir = "./mutation_campaign"

output_folder = Path(output_dir).resolve()
if output_folder.is_dir():
shutil.rmtree(output_folder)

# setting RR mutator as first mutator
Expand Down Expand Up @@ -226,6 +231,9 @@ def main() -> (None): # pylint: disable=too-many-statements,too-many-branches,t
)
)

# Keep a list of all already mutated contracts so we don't mutate them twice
mutated_contracts: List[str] = []

for filename in sol_file_list: # pylint: disable=too-many-nested-blocks
file_name = os.path.split(filename)[1].split(".sol")[0]
# slither object
Expand All @@ -240,16 +248,16 @@ def main() -> (None): # pylint: disable=too-many-statements,too-many-branches,t
dont_mutate_lines = []

# mutation
target_contract = ""
target_contract = "SLITHER_SKIP_MUTATIONS" if contract_names else ""
try:
for compilation_unit_of_main_file in sl.compilation_units:
for contract in compilation_unit_of_main_file.contracts:
if contract_names is not None and contract.name in contract_names:
if contract.name in contract_names and contract.name not in mutated_contracts:
target_contract = contract
elif contract_names is not None and contract.name not in contract_names:
target_contract = "SLITHER_SKIP_MUTATIONS"
elif contract.name.lower() == file_name.lower():
break
elif not contract_names and contract.name.lower() == file_name.lower():
target_contract = contract
break

if target_contract == "":
logger.info(
Expand All @@ -267,6 +275,8 @@ def main() -> (None): # pylint: disable=too-many-statements,too-many-branches,t
logger.debug(f"Skipping mutations on interface {filename}")
continue

# Add our target to the mutation list
mutated_contracts.append(target_contract.name)
logger.info(blue(f"Mutating contract {target_contract}"))
for M in mutators_list:
m = M(
Expand Down Expand Up @@ -322,6 +332,7 @@ def main() -> (None): # pylint: disable=too-many-statements,too-many-branches,t

except Exception as e: # pylint: disable=broad-except
logger.error(e)
transfer_and_delete(files_dict)

except KeyboardInterrupt:
# transfer and delete the backup files if interrupted
Expand Down
8 changes: 5 additions & 3 deletions slither/tools/mutator/mutators/abstract_mutator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import abc
import logging
from pathlib import Path
from typing import Optional, Dict, Tuple, List
from slither.core.compilation_unit import SlitherCompilationUnit
from slither.formatters.utils.patches import apply_patch, create_diff
Expand Down Expand Up @@ -29,7 +30,7 @@ def __init__( # pylint: disable=too-many-arguments
solc_remappings: str | None,
verbose: bool,
very_verbose: bool,
output_folder: str,
output_folder: Path,
dont_mutate_line: List[int],
rate: int = 10,
seed: Optional[int] = None,
Expand Down Expand Up @@ -89,6 +90,7 @@ def mutate(self) -> Tuple[List[int], List[int], List[int]]:
for patch in patches:
# test the patch
patchWasCaught = test_patch(
self.output_folder,
file,
patch,
self.test_command,
Expand Down Expand Up @@ -116,8 +118,8 @@ def mutate(self) -> Tuple[List[int], List[int], List[int]]:
logger.info(f"Impossible to generate patch; empty {patches}")

# add uncaught mutant patches to a output file
with open(
self.output_folder + "/patches_file.txt", "a", encoding="utf8"
with (self.output_folder / "patches_files.txt").open(
"a", encoding="utf8"
) as patches_file:
patches_file.write(diff + "\n")

Expand Down
100 changes: 39 additions & 61 deletions slither/tools/mutator/utils/file_handling.py
Original file line number Diff line number Diff line change
@@ -1,89 +1,76 @@
import os
import traceback
from typing import Dict, List
from typing import Dict, List, Union
import logging
from pathlib import Path
import hashlib

logger = logging.getLogger("Slither-Mutate")

duplicated_files = {}
HashedPath = str
backuped_files: Dict[str, HashedPath] = {}


def backup_source_file(source_code: Dict, output_folder: str) -> Dict:
def backup_source_file(source_code: Dict, output_folder: Path) -> Dict[str, HashedPath]:
"""
function to backup the source file
returns: dictionary of duplicated files
"""
os.makedirs(output_folder, exist_ok=True)

output_folder.mkdir(exist_ok=True, parents=True)
for file_path, content in source_code.items():
directory, filename = os.path.split(file_path)
new_filename = f"{output_folder}/backup_{filename}"
new_file_path = os.path.join(directory, new_filename)
path_hash = hashlib.md5(bytes(file_path, "utf8")).hexdigest()
(output_folder / path_hash).write_text(content, encoding="utf8")

with open(new_file_path, "w", encoding="utf8") as new_file:
new_file.write(content)
duplicated_files[file_path] = new_file_path
backuped_files[file_path] = (output_folder / path_hash).as_posix()

return duplicated_files
return backuped_files


def transfer_and_delete(files_dict: Dict) -> None:
def transfer_and_delete(files_dict: Dict[str, HashedPath]) -> None:
"""function to transfer the original content to the sol file after campaign"""
try:
files_dict_copy = files_dict.copy()
for item, value in files_dict_copy.items():
with open(value, "r", encoding="utf8") as duplicated_file:
for original_path, hashed_path in files_dict_copy.items():
with open(hashed_path, "r", encoding="utf8") as duplicated_file:
content = duplicated_file.read()

with open(item, "w", encoding="utf8") as original_file:
with open(original_path, "w", encoding="utf8") as original_file:
original_file.write(content)

os.remove(value)
Path(hashed_path).unlink()

# delete elements from the global dict
del duplicated_files[item]
del backuped_files[original_path]

except Exception as e: # pylint: disable=broad-except
logger.error(f"Error transferring content: {e}")
except FileNotFoundError as e: # pylint: disable=broad-except
logger.error(f"Error transferring content: %s", e)


global_counter = {}


def create_mutant_file(file: str, rule: str) -> None:
def create_mutant_file(output_folder: Path, file: str, rule: str) -> None:
"""function to create new mutant file"""
try:
if rule not in global_counter:
global_counter[rule] = 0
_, filename = os.path.split(file)

file_path = Path(file)
# Read content from the duplicated file
with open(file, "r", encoding="utf8") as source_file:
content = source_file.read()
content = file_path.read_text(encoding="utf8")

# Write content to the original file
mutant_name = filename.split(".")[0]

mutant_name = file_path.stem
# create folder for each contract
os.makedirs("mutation_campaign/" + mutant_name, exist_ok=True)
with open(
"mutation_campaign/"
+ mutant_name
+ "/"
+ mutant_name
+ "_"
+ rule
+ "_"
+ str(global_counter[rule])
+ ".sol",
"w",
encoding="utf8",
) as mutant_file:
mutation_dir = output_folder / mutant_name
mutation_dir.mkdir(parents=True, exist_ok=True)

mutation_filename = f"{mutant_name}_{rule}_{global_counter[rule]}.sol"
with (mutation_dir / mutation_filename).open("w", encoding="utf8") as mutant_file:
mutant_file.write(content)
global_counter[rule] += 1

# reset the file
with open(duplicated_files[file], "r", encoding="utf8") as duplicated_file:
duplicate_content = duplicated_file.read()
duplicate_content = Path(backuped_files[file]).read_text("utf8")

with open(file, "w", encoding="utf8") as source_file:
source_file.write(duplicate_content)
Expand All @@ -97,19 +84,18 @@ def create_mutant_file(file: str, rule: str) -> None:
def reset_file(file: str) -> None:
"""function to reset the file"""
try:
# directory, filename = os.path.split(file)
# reset the file
with open(duplicated_files[file], "r", encoding="utf8") as duplicated_file:
with open(backuped_files[file], "r", encoding="utf8") as duplicated_file:
duplicate_content = duplicated_file.read()

with open(file, "w", encoding="utf8") as source_file:
source_file.write(duplicate_content)

except Exception as e: # pylint: disable=broad-except
logger.error(f"Error resetting file: {e}")
logger.error(f"Error resetting file: %s", e)


def get_sol_file_list(codebase: str, ignore_paths: List[str] | None) -> List[str]:
def get_sol_file_list(codebase: Path, ignore_paths: Union[List[str], None]) -> List[str]:
"""
function to get the contracts list
returns: list of .sol files
Expand All @@ -119,21 +105,13 @@ def get_sol_file_list(codebase: str, ignore_paths: List[str] | None) -> List[str
ignore_paths = []

# if input is contract file
if os.path.isfile(codebase):
return [codebase]
if codebase.is_file():
return [codebase.as_posix()]

# if input is folder
if os.path.isdir(codebase):
directory = os.path.abspath(codebase)
for file in os.listdir(directory):
filename = os.path.join(directory, file)
if os.path.isfile(filename):
sol_file_list.append(filename)
elif os.path.isdir(filename):
_, dirname = os.path.split(filename)
if dirname in ignore_paths:
continue
for i in get_sol_file_list(filename, ignore_paths):
sol_file_list.append(i)
if codebase.is_dir():
for file_name in codebase.rglob("*.sol"):
if not any(part in ignore_paths for part in file_name.parts):
sol_file_list.append(file_name.as_posix())

return sol_file_list
11 changes: 8 additions & 3 deletions slither/tools/mutator/utils/testing_generated_mutant.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import logging
import sys
import subprocess
from typing import Dict
from pathlib import Path
from typing import Dict, Union
import crytic_compile
from slither.tools.mutator.utils.file_handling import create_mutant_file, reset_file
from slither.utils.colors import green, red, yellow
Expand Down Expand Up @@ -77,12 +78,13 @@ def run_test_cmd(cmd: str, timeout: int | None, target_file: str | None, verbose

# return 0 if uncaught, 1 if caught, and 2 if compilation fails
def test_patch( # pylint: disable=too-many-arguments
output_folder: Path,
file: str,
patch: Dict,
command: str,
generator_name: str,
timeout: int,
mappings: str | None,
mappings: Union[str, None],
verbose: bool,
very_verbose: bool,
) -> int:
Expand All @@ -97,15 +99,18 @@ def test_patch( # pylint: disable=too-many-arguments
# Write the modified content back to the file
with open(file, "w", encoding="utf-8") as filepath:
filepath.write(replaced_content)

if compile_generated_mutant(file, mappings):
if run_test_cmd(command, timeout, file, False):
create_mutant_file(file, generator_name)

create_mutant_file(output_folder, file, generator_name)
logger.info(
red(
f"[{generator_name}] Line {patch['line_number']}: '{patch['old_string']}' ==> '{patch['new_string']}' --> UNCAUGHT"
)
)
reset_file(file)

return 0 # uncaught
else:
if very_verbose:
Expand Down
Loading