From 48f7d14b62c0ff1646aa6afb051b2f630f3f7723 Mon Sep 17 00:00:00 2001 From: Sahana CB Date: Tue, 27 Feb 2024 10:22:39 -0800 Subject: [PATCH] Add cli for ICD Summary: - Adds CLI for ICD Reviewed By: mbhatt1 Differential Revision: D54237525 fbshipit-source-id: 7eda81fb945c0ef88d60a7a9fda1945d4dec630b --- .../insecure_code_detector/cli.py | 71 +++++++++++++++++++ .../insecure_code_detector.py | 33 ++++++--- .../insecure_code_detector/issues.py | 5 ++ 3 files changed, 100 insertions(+), 9 deletions(-) create mode 100644 CybersecurityBenchmarks/insecure_code_detector/cli.py diff --git a/CybersecurityBenchmarks/insecure_code_detector/cli.py b/CybersecurityBenchmarks/insecure_code_detector/cli.py new file mode 100644 index 000000000..8ff66aa9e --- /dev/null +++ b/CybersecurityBenchmarks/insecure_code_detector/cli.py @@ -0,0 +1,71 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +from __future__ import annotations + +import argparse +import asyncio +import logging +import time + +from . import insecure_code_detector, languages + +LOG: logging.Logger = logging.getLogger(__name__) + + +def validate_arguments( + path: str, + language: str, +) -> None: + # TODO: Path validation + if language not in languages.get_supported_languages(): + raise ValueError(f"Language {language} is not supported.") + + +async def main() -> None: + parser = argparse.ArgumentParser(description="Run the Insecure Code Detector.") + + parser.add_argument( + "-p", + "--path", + required=True, + help="Specify the path on which you want to run the Insecure Code Detector.", + ) + + parser.add_argument( + "-l", + "--language", + required=True, + choices=languages.get_supported_languages(), + help="Specify the language of the code you want to run the Insecure Code Detector on.", + ) + + parser.add_argument( + "--debug", + action="store_true", + help="Enable debug logging", + ) + + args = parser.parse_args() + validate_arguments(args.path, args.language) + + logging.basicConfig( + format="%(asctime)s [%(levelname)s] %(message)s", + level=logging.INFO if not args.debug else logging.DEBUG, + ) + LOG.info(f"Running Insecure Code Detector on {args.path} in {args.language}...") + start_time = time.monotonic() + results = await insecure_code_detector.analyze( + args.language, "", None, None, args.path + ) + for result in results: + print(result.to_json()) + end_time = time.monotonic() + logging.info("ICD took %dms", (end_time - start_time) * 1000) + logging.info("Found {} issues".format(len(results))) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/CybersecurityBenchmarks/insecure_code_detector/insecure_code_detector.py b/CybersecurityBenchmarks/insecure_code_detector/insecure_code_detector.py index 6cdf69480..f84935e99 100644 --- a/CybersecurityBenchmarks/insecure_code_detector/insecure_code_detector.py +++ b/CybersecurityBenchmarks/insecure_code_detector/insecure_code_detector.py @@ -75,6 +75,7 @@ class CodeContext: before: str | None after: str | None path: Path + is_existing_path: bool = False async def analyze( @@ -82,22 +83,31 @@ async def analyze( generated_code: str, code_before: Optional[str] = None, code_after: Optional[str] = None, + path: Optional[Path] = None, ) -> List[Issue]: """ Checks if the string contains insecure code. :return: True if the string is insecure, False otherwise. """ result = [] - code_context = CodeContext( - language, - code_before, - code_after, - _write_code_to_file( + is_path_preexisting = True if path else False + file_path = ( + path + if is_path_preexisting + else _write_code_to_file( (code_before if code_before else "") + generated_code + (code_after if code_after else ""), languages.get_file_extension(language), - ), + ) + ) + + code_context = CodeContext( + language, + code_before, + code_after, + file_path or Path(), + is_path_preexisting, ) # Create tasks for each analyzer @@ -131,7 +141,8 @@ async def analyze( # end of static analysis: clean up try: - os.remove(code_context.path) + if not is_path_preexisting: + os.remove(code_context.path) except OSError: LOG.warn("File created for static analysis could not be deleted.") @@ -303,6 +314,8 @@ async def _weggli_analyze( ["-X"] if code_context.language == Language.CPP else [] ) result = await _run_all_weggli_tasks(patterns, command_base, code_context.path) + if code_context.is_existing_path: + return result filered_results = _filter_issues_in_generated_code( result, generated_code, code_context.before ) @@ -351,7 +364,9 @@ async def _semgrep_analyze( ) output = await _run_command(command) result = _parse_semgrep_issues(output) - filered_results = _filter_issues_in_generated_code( + if code_context.is_existing_path: + return result + filtered_result = _filter_issues_in_generated_code( result, generated_code, code_context.before ) - return filered_results + return filtered_result diff --git a/CybersecurityBenchmarks/insecure_code_detector/issues.py b/CybersecurityBenchmarks/insecure_code_detector/issues.py index 9ef7a87f7..82eabd4d3 100644 --- a/CybersecurityBenchmarks/insecure_code_detector/issues.py +++ b/CybersecurityBenchmarks/insecure_code_detector/issues.py @@ -5,6 +5,8 @@ from __future__ import annotations +import json + from dataclasses import dataclass from .analyzers import Analyzer @@ -24,3 +26,6 @@ class Issue: replacement: str | None = None analyzer: Analyzer | None = None pattern_id: str | None = None + + def to_json(self) -> str: + return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True)