diff --git a/.github/workflows/task_runner_e2e.yml b/.github/workflows/task_runner_e2e.yml new file mode 100644 index 0000000000..a1e52df6a3 --- /dev/null +++ b/.github/workflows/task_runner_e2e.yml @@ -0,0 +1,95 @@ +#--------------------------------------------------------------------------- +# Workflow to run Task Runner end to end tests +# Authors - Noopur, Payal Chaurasiya +#--------------------------------------------------------------------------- +name: Task Runner E2E + +on: + schedule: + - cron: '0 0 * * *' # Run every day at midnight + workflow_dispatch: + inputs: + num_rounds: + description: 'Number of rounds to train' + required: false + default: "5" + type: string + num_collaborators: + description: 'Number of collaborators' + required: false + default: "2" + type: string + +permissions: + contents: read + +# Environment variables common for all the jobs +env: + NUM_ROUNDS: ${{ inputs.num_rounds || '5' }} + NUM_COLLABORATORS: ${{ inputs.num_collaborators || '2' }} + +jobs: + test_run: + name: test + runs-on: ubuntu-22.04 + timeout-minutes: 120 # 2 hours + strategy: + matrix: + # There are open issues for some of the models, so excluding them for now: + # model_name: [ "torch_cnn_mnist", "keras_cnn_mnist", "torch_cnn_histology" ] + model_name: [ "torch_cnn_mnist", "keras_cnn_mnist" ] + python_version: [ "3.8", "3.9", "3.10" ] + fail-fast: false # do not immediately fail if one of the combinations fail + + env: + MODEL_NAME: ${{ matrix.model_name }} + PYTHON_VERSION: ${{ matrix.python_version }} + + steps: + - name: Checkout OpenFL repository + id: checkout_openfl + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Set up Python + id: setup_python + uses: actions/setup-python@v3 + with: + python-version: ${{ env.PYTHON_VERSION }} + + - name: Install dependencies + id: install_dependencies + run: | + python -m pip install --upgrade pip + pip install . + pip install -r test-requirements.txt + + - name: Run Task Runner E2E tests + id: run_task_runner_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/task_runner_tests.py -m ${{ env.MODEL_NAME }} --num_rounds $NUM_ROUNDS --num_collaborators $NUM_COLLABORATORS --model_name ${{ env.MODEL_NAME }} + echo "Task runner end to end test run completed" + + - name: Print test summary # Print the test summary only if the tests were run + id: print_test_summary + if: steps.run_task_runner_tests.outcome == 'success' || steps.run_task_runner_tests.outcome == 'failure' + run: | + export PYTHONPATH="$PYTHONPATH:." + python tests/end_to_end/utils/xml_helper.py + echo "Test summary printed" + + - name: Tar files # Tar the test results only if the tests were run + id: tar_files + if: steps.run_task_runner_tests.outcome == 'success' || steps.run_task_runner_tests.outcome == 'failure' + run: tar -cvf result.tar results + + - name: Upload Artifacts # Upload the test results only if the tar was created + id: upload_artifacts + uses: actions/upload-artifact@v4 + if: steps.tar_files.outcome == 'success' + with: + name: task_runner_${{ env.MODEL_NAME }}_python${{ env.PYTHON_VERSION }}_${{ github.run_id }} + path: result.tar diff --git a/.gitignore b/.gitignore index 578b6ed112..8a106933ef 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,4 @@ venv/* .eggs eggs/* *.pyi +results/* \ No newline at end of file diff --git a/test-requirements.txt b/test-requirements.txt index 80ed75cde5..19bf081db1 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,3 +1,4 @@ +lxml==5.3.0 pytest==8.3.3 pytest-asyncio==0.24.0 pytest-mock==3.14.0 diff --git a/tests/end_to_end/README.md b/tests/end_to_end/README.md new file mode 100644 index 0000000000..3971b67986 --- /dev/null +++ b/tests/end_to_end/README.md @@ -0,0 +1,60 @@ +# End-to-end Pytest Framework + +This project aims at integration testing of ```openfl-workspace``` using pytest framework. + +## Test Structure + +``` +tests/end_to_end +├── models # Central location for all model-related code for testing purpose +├── test_suites # Folder containing test files +├── utils # Folder containing helper files +├── conftest.py # Pytest framework configuration file +├── pytest.ini # Pytest initialisation file +└── README.md # Readme file +``` + +** File `sample_tests.py` provided under `test_suites` acts as a reference on how to add a new test case. + +## Pre-requisites + +1. Setup virtual environment and install OpenFL using [online documentation](https://openfl.readthedocs.io/en/latest/get_started/installation.html). +2. Ensure that the OpenFL workspace (inside openfl-workspace) is present for the model being tested. If not, create it first. + +## Installation + +To install the required dependencies on above virtual environment, run: + +```sh +pip install -r test-requirements.txt +``` + +## Usage + +### Running Tests + +To run a specific test case, use below command: + +```sh +python -m pytest tests/end_to_end/test_suites/ -k -s +``` + +** -s will ensure all the logs are printed on screen. Ignore, if not required. + +To modify the number of collaborators, rounds to train and/or model name, use below parameters: +1. --num_collaborators +2. --num_rounds +3. --model_name + +### Output Structure + +``` +results + ├── # Based on the workspace name provided during test run. + ├── results.xml # Output file in JUNIT. + └── deployment.log # Log file containing step by step test progress. +``` + +## Contribution + +https://github.com/securefederatedai/openfl/blob/develop/CONTRIBUTING.md diff --git a/tests/end_to_end/conftest.py b/tests/end_to_end/conftest.py new file mode 100644 index 0000000000..3ccffb0e0f --- /dev/null +++ b/tests/end_to_end/conftest.py @@ -0,0 +1,282 @@ +# Copyright 2020-2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import pytest +import collections +import os +import shutil +import xml.etree.ElementTree as ET +import logging + +from tests.end_to_end.utils.logger import configure_logging +from tests.end_to_end.utils.logger import logger as log +from tests.end_to_end.utils.conftest_helper import parse_arguments +import tests.end_to_end.utils.constants as constants +import tests.end_to_end.models.participants as participants + +# Define a named tuple to store the objects for model owner, aggregator, and collaborators +federation_fixture = collections.namedtuple( + "federation_fixture", + "model_owner, aggregator, collaborators, model_name, workspace_path, results_dir", +) + + +def pytest_addoption(parser): + """ + Add custom command line options to the pytest parser. + Args: + parser: pytest parser object + """ + parser.addini("results_dir", "Directory to store test results", default="results") + parser.addini("log_level", "Logging level", default="DEBUG") + parser.addoption( + "--results_dir", action="store", type=str, default="results", help="Results directory" + ) + parser.addoption( + "--num_collaborators", + action="store", + type=int, + default=constants.NUM_COLLABORATORS, + help="Number of collaborators", + ) + parser.addoption( + "--num_rounds", + action="store", + type=int, + default=constants.NUM_ROUNDS, + help="Number of rounds to train", + ) + parser.addoption( + "--model_name", + action="store", + type=str, + default=constants.DEFAULT_MODEL_NAME, + help="Model name", + ) + + +@pytest.fixture(scope="session", autouse=True) +def setup_logging(pytestconfig): + """ + Setup logging for the test session. + Args: + pytestconfig: pytest config object + Returns: + logger: logger object + """ + results_dir = pytestconfig.getini("results_dir") + log_level = pytestconfig.getini("log_level") + + if not os.path.exists(results_dir): + os.makedirs(results_dir) + + # Setup a global logger to ensure logging works before any test-specific logs are set + configure_logging(os.path.join(results_dir, "deployment.log"), log_level) + return logging.getLogger() + + +@pytest.hookimpl(tryfirst=True, hookwrapper=True) +def pytest_runtest_makereport(item, call): + """ + Hook to capture the result of setup, call, and teardown phases. + This avoids duplicate entries for Pass/Fail in the XML report. + """ + outcome = yield + report = outcome.get_result() + + # Retrieve the custom test_id marker if it exists + test_id_marker = item.get_closest_marker("test_id") + outcome_mapping = {"passed": "Pass", "failed": "Fail"} + report_when_mapping = {"setup": "Setup", "call": "Test", "teardown": "Teardown"} + final_outcome = outcome_mapping.get(report.outcome, report.outcome) + report_phase = report_when_mapping.get(report.when, report.when) + + # Modify nodeid if test_id is provided and append outcome and phase + if test_id_marker: + test_id = test_id_marker.args[0] + report.nodeid = ( + f"{report.nodeid} [{test_id}] [outcome: {final_outcome}] [phase: {report_phase}]" + ) + + # Initialize XML structure if not already initialized + if not hasattr(item.config, "_xml_report"): + item.config._xml_report = ET.Element( + "testsuite", + { + "name": "pytest", + "errors": "0", + "failures": "0", + "skipped": "0", + "tests": "0", + "time": "0", + "timestamp": "", + "hostname": "", + }, + ) + + # Store the result of each phase (setup/call/teardown) + if not hasattr(item, "_results"): + item._results = {} + + # Save the outcome and other details per phase + item._results[report.when] = { + "outcome": final_outcome, + "longrepr": report.longrepr, + "duration": report.duration, + } + # Log failures + if report.when == "call" and report.failed: + logger = logging.getLogger() + logger.error(f"Test {report.nodeid} failed: {call.excinfo.value}") + + # Only create the XML element after the teardown phase + if report.when == "teardown" and not hasattr(item, "_xml_created"): + item._xml_created = True # Ensure XML creation happens only once + + # Determine final outcome based on the worst phase result + if "call" in item._results: + final_outcome = item._results["call"]["outcome"] + elif "setup" in item._results: + final_outcome = item._results["setup"]["outcome"] + else: + final_outcome = "skipped" + + # Create the XML element + testcase = ET.SubElement( + item.config._xml_report, + "testcase", + { + "classname": item.module.__name__, + "name": item.name, + "time": str(sum(result["duration"] for result in item._results.values())), + }, + ) + + # Add or tags based on the final outcome + if final_outcome == "Fail": + failure_message = item._results.get("call", {}).get( + "longrepr", item._results.get("setup", {}).get("longrepr", "Unknown Error") + ) + failure = ET.SubElement( + testcase, + "error", + { + "message": str(failure_message), + }, + ) + failure.text = str(failure_message) + elif final_outcome == "skipped": + skipped_message = item._results.get("setup", {}).get("longrepr", "Skipped") + skipped = ET.SubElement( + testcase, + "skipped", + { + "message": str(skipped_message), + }, + ) + skipped.text = str(skipped_message) + + # Update the testsuite summary statistics + tests = int(item.config._xml_report.attrib["tests"]) + 1 + item.config._xml_report.attrib["tests"] = str(tests) + if final_outcome == "Fail": + failures = int(item.config._xml_report.attrib["failures"]) + 1 + item.config._xml_report.attrib["failures"] = str(failures) + elif final_outcome == "skipped": + skipped = int(item.config._xml_report.attrib["skipped"]) + 1 + item.config._xml_report.attrib["skipped"] = str(skipped) + + +def pytest_sessionfinish(session, exitstatus): + """ + Operations to be performed after the test session is finished. + More functionalities to be added in this function in future. + """ + cache_dir = os.path.join(session.config.rootdir, ".pytest_cache") + log.debug(f"\nClearing .pytest_cache directory at {cache_dir}") + if os.path.exists(cache_dir): + shutil.rmtree(cache_dir, ignore_errors=False) + log.debug(f"Cleared .pytest_cache directory at {cache_dir}") + + +@pytest.fixture(scope="module") +def fx_federation(request, pytestconfig): + """ + Fixture for federation. This fixture is used to create the model owner, aggregator, and collaborators. + It also creates workspace. + Assumption: OpenFL workspace is present for the model being tested. + Args: + request: pytest request object. Model name is passed as a parameter to the fixture from test cases. + pytestconfig: pytest config object + Returns: + federation_fixture: Named tuple containing the objects for model owner, aggregator, and collaborators + + Note: As this is a module level fixture, thus no import is required at test level. + """ + log.info("Fixture for federation setup using Task Runner API on single machine.") + collaborators = [] + agg_domain_name = "localhost" + + # Parse the command line arguments + args = parse_arguments() + model_name = args.model_name + results_dir = args.results_dir or pytestconfig.getini("results_dir") + num_collaborators = args.num_collaborators + num_rounds = args.num_rounds + + # Validate the model name and create the workspace name + if not model_name.upper() in constants.ModelName._member_names_: + raise ValueError(f"Invalid model name: {model_name}") + + workspace_name = f"workspace_{model_name}" + + # Create model owner object and the workspace for the model + model_owner = participants.ModelOwner(workspace_name, model_name) + try: + workspace_path = model_owner.create_workspace(results_dir=results_dir) + except Exception as e: + log.error(f"Failed to create the workspace: {e}") + raise e + + # Modify and initialize the plan + try: + model_owner.modify_plan(new_rounds=num_rounds, num_collaborators=num_collaborators) + except Exception as e: + log.error(f"Failed to modify the plan: {e}") + raise e + + try: + model_owner.initialize_plan(agg_domain_name=agg_domain_name) + except Exception as e: + log.error(f"Failed to initialize the plan: {e}") + raise e + + # Modify and initialize the plan + try: + model_owner.certify_workspace() + except Exception as e: + log.error(f"Failed to certify the workspace: {e}") + raise e + + # Create the objects for aggregator and collaborators + aggregator = participants.Aggregator( + agg_domain_name=agg_domain_name, workspace_path=workspace_path + ) + + for i in range(num_collaborators): + collaborator = participants.Collaborator( + collaborator_name=f"collaborator{i+1}", + data_directory_path=i + 1, + workspace_path=workspace_path, + ) + collaborators.append(collaborator) + + # Return the federation fixture + return federation_fixture( + model_owner=model_owner, + aggregator=aggregator, + collaborators=collaborators, + model_name=model_name, + workspace_path=workspace_path, + results_dir=results_dir, + ) diff --git a/tests/end_to_end/models/participants.py b/tests/end_to_end/models/participants.py new file mode 100644 index 0000000000..0469868ea8 --- /dev/null +++ b/tests/end_to_end/models/participants.py @@ -0,0 +1,433 @@ +# Copyright 2020-2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import os +from datetime import datetime +import yaml +import logging + +import tests.end_to_end.utils.constants as constants +import tests.end_to_end.utils.subprocess_helper as sh + +log = logging.getLogger(__name__) + + +# Define the ModelOwner class +class ModelOwner: + """ + ModelOwner class to handle the model related operations. + Note: Aggregator can also act as a model owner. + This includes (non-exhaustive list): + 1. Creating the workspace - to create a workspace using given workspace and model names. + 2. Modifying based on input params provided and initializing the plan. + 3. Certifying the workspace and setting up the PKI. + 4. Importing and exporting the workspace etc. + """ + + def __init__(self, workspace_name, model_name): + """ + Initialize the ModelOwner class + Args: + workspace_name (str): Workspace name + model_name (str): Model name + """ + self.workspace_name = workspace_name + self.model_name = model_name + self.aggregator = None + self.collaborators = [] + self.workspace_path = None + self.plan_path = None + self.num_collaborators = constants.NUM_COLLABORATORS + self.rounds_to_train = constants.NUM_ROUNDS + + def create_workspace(self, results_dir=None): + """ + Create the workspace for the model + Args: + results_dir (str): Results directory path + Returns: + str: Path to the workspace + """ + try: + results_dir = results_dir if results_dir else os.getcwd() + return_code, _, error = sh.run_command( + f"fx workspace create --prefix {self.workspace_name} --template {self.model_name}", + work_dir=results_dir, + ) + if return_code != 0: + log.error(f"Failed to create the workspace: {error}") + raise Exception(f"Failed to create the workspace: {error}") + + log.info(f"Created the workspace {self.workspace_name} for the {self.model_name} model") + self.workspace_path = os.path.join(results_dir, self.workspace_name) + log.info(f"Workspace path: {self.workspace_path}") + except Exception as e: + log.error(f"Failed to create the workspace: {e}") + raise e + return self.workspace_path + + def get_workspace_path(self, results_dir, workspace_name): + """ + Get the workspace path + Args: + results_dir (str): Results directory path + workspace_name (str): Workspace name + Returns: + str: Path to the workspace + """ + workspace_path = os.path.join(results_dir, workspace_name) + log.info(f"Workspace path: {workspace_path}") + if os.path.exists(workspace_path): + self.workspace_path = workspace_path + log.info(f"Workspace path: {self.workspace_path}") + else: + log.error(f"Workspace {workspace_name} does not exist in {results_dir}") + raise FileNotFoundError(f"Workspace {workspace_name} does not exist in {results_dir}") + return self.workspace_path + + def certify_collaborator(self, collaborator_name): + """ + Sign the CSR for the collaborator + Args: + collaborator_name (str): Name of the collaborator + Returns: + bool: True if successful, else False + """ + try: + zip_name = f"col_{collaborator_name}_to_agg_cert_request.zip" + col_zip = os.path.join(os.getcwd(), self.workspace_path, zip_name) + return_code, output, error = sh.run_command( + f"fx collaborator certify --request-pkg {col_zip} -s", work_dir=self.workspace_path + ) + msg_received = [line for line in output if constants.SUCCESS_MARKER in line] + log.info(f"Message received: {msg_received}") + if return_code == 0 and len(msg_received): + log.info( + f"Successfully signed the CSR for the collaborator {collaborator_name} with zip path {col_zip}" + ) + else: + log.error(f"Failed to sign the CSR for collaborator {collaborator_name}: {error}") + + except Exception as e: + log.error(f"Failed to sign the CSR: {e}") + raise e + return True + + def modify_plan(self, new_rounds=None, num_collaborators=None): + """ + Modify the plan to train the model + Args: + new_rounds (int): Number of rounds to train + num_collaborators (int): Number of collaborators + Returns: + bool: True if successful, else False + """ + self.plan_path = os.path.join(self.workspace_path, "plan", "plan.yaml") + log.info(f"Modifying the plan at {self.plan_path}") + # Open the file and modify the entries + self.rounds_to_train = new_rounds if new_rounds else self.rounds_to_train + self.num_collaborators = num_collaborators if num_collaborators else self.num_collaborators + + with open(self.plan_path) as fp: + data = yaml.load(fp, Loader=yaml.FullLoader) + + data["aggregator"]["settings"]["rounds_to_train"] = int(self.rounds_to_train) + data["data_loader"]["settings"]["collaborator_count"] = int(self.num_collaborators) + + with open(self.plan_path, "w+") as write_file: + yaml.dump(data, write_file) + + log.info( + f"Modified the plan to train the model for collaborators {self.num_collaborators} and {self.rounds_to_train} rounds" + ) + return True + + def initialize_plan(self, agg_domain_name): + """ + Initialize the plan + Args: + agg_domain_name (str): Aggregator domain name + Returns: + bool: True if successful, else False + """ + try: + log.info("Initializing the plan. It will take some time to complete..") + return_code, _, error = sh.run_command(f"fx plan initialize -a {agg_domain_name}", work_dir=self.workspace_path) + if return_code != 0: + log.error(f"Failed to initialize the plan: {error}") + raise Exception(f"Failed to initialize the plan: {error}") + + log.info(f"Initialized the plan for the workspace {self.workspace_name}") + except Exception as e: + log.error(f"Failed to initialize the plan: {e}") + raise e + return True + + def certify_workspace(self): + """ + Certify the workspace + Returns: + bool: True if successful, else False + """ + try: + return_code, _, error = sh.run_command("fx workspace certify", work_dir=self.workspace_path) + if return_code != 0: + log.error(f"Failed to certify the workspace: {error}") + raise Exception(f"Failed to certify the workspace: {error}") + + log.info(f"Certified the workspace {self.workspace_name}") + except Exception as e: + log.error(f"Failed to certify the workspace: {e}") + raise e + return True + + def certify_aggregator(self, agg_domain_name): + """ + Certify the aggregator request + Args: + agg_domain_name (str): Aggregator domain name + Returns: + bool: True if successful, else False + """ + log.info(f"CA should sign the aggregator request") + try: + return_code, _, error = sh.run_command( + f"fx aggregator certify --silent --fqdn {agg_domain_name}", + work_dir=self.workspace_path, + ) + if return_code != 0: + log.error(f"Failed to certify the aggregator request: {error}") + raise Exception(f"Failed to certify the aggregator request: {error}") + + log.info(f"CA signed the request from aggregator") + except Exception as e: + log.error(f"Failed to certify the aggregator request : {e}") + raise e + return True + + def export_workspace(self): + """ + Export the workspace + Returns: + bool: True if successful, else False + """ + try: + return_code, _, error = sh.run_command("fx workspace export", work_dir=self.workspace_path) + if return_code != 0: + log.error(f"Failed to export the workspace: {error}") + raise Exception(f"Failed to export the workspace: {error}") + + log.info(f"Exported the workspace") + except Exception as e: + log.error(f"Failed to export the workspace: {e}") + raise e + return True + + def import_workspace(self, workspace_zip): + """ + Import the workspace + Args: + workspace_zip (str): Path to the workspace zip file + Returns: + bool: True if successful, else False + """ + try: + return_code, _, error = sh.run_command( + f"fx workspace import --archive {workspace_zip}", work_dir=self.workspace_path + ) + if return_code != 0: + log.error(f"Failed to import the workspace: {error}") + raise Exception(f"Failed to import the workspace: {error}") + + log.info(f"Imported the workspace") + except Exception as e: + log.error(f"Failed to import the workspace: {e}") + raise e + return True + + +# Define the Aggregator class +class Aggregator: + """ + Aggregator class to handle the aggregator operations. + This includes (non-exhaustive list): + 1. Generating the sign request + 2. Starting the aggregator + """ + + def __init__(self, agg_domain_name=None, workspace_path=None): + """ + Initialize the Aggregator class + """ + self.name = "aggregator" + self.agg_domain_name = agg_domain_name + self.workspace_path = workspace_path + + def generate_sign_request(self): + """ + Generate a sign request for the aggregator + Returns: + bool: True if successful, else False + """ + try: + return_code, _, error = sh.run_command( + f"fx aggregator generate-cert-request --fqdn {self.agg_domain_name}", + work_dir=self.workspace_path, + ) + if return_code != 0: + log.error(f"Failed to generate the sign request: {error}") + raise Exception(f"Failed to generate the sign request: {error}") + + log.info(f"Generated a sign request for {self.name}") + except Exception as e: + log.error(f"Failed to generate the sign request: {e}") + raise e + return True + + def start(self): + """ + Start the aggregator + Returns: + str: Path to the log file + """ + try: + log.info(f"Starting {self.name}") + curr_time = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"{self.name}_{curr_time}.log" + res_file = os.path.join(os.getcwd(), self.workspace_path, filename) + bg_file = open(res_file, "w", buffering=1) + + sh.run_command_background( + "fx aggregator start", + work_dir=self.workspace_path, + redirect_to_file=bg_file, + check_sleep=60, + ) + log.info( + f"Started {self.name} and tracking the logs at {os.path.join(self.workspace_path, filename)}" + ) + except Exception as e: + log.error(f"Failed to start the aggregator: {e}") + res_file.close() + raise e + return res_file + + +# Define the Collaborator class +class Collaborator: + """ + Collaborator class to handle the collaborator operations. + This includes (non-exhaustive list): + 1. Generating the sign request + 2. Creating the collaborator + 3. Importing and certifying the CSR + 4. Starting the collaborator + """ + + def __init__(self, collaborator_name=None, data_directory_path=None, workspace_path=None): + """ + Initialize the Collaborator class + """ + self.name = collaborator_name + self.collaborator_name = collaborator_name + self.data_directory_path = data_directory_path + self.workspace_path = workspace_path + + def generate_sign_request(self): + """ + Generate a sign request for the collaborator + Returns: + bool: True if successful, else False + """ + try: + return_code, _, error = sh.run_command( + f"fx collaborator generate-cert-request -n {self.collaborator_name}", + work_dir=self.workspace_path, + ) + if return_code != 0: + log.error(f"Failed to generate the sign request: {error}") + raise Exception(f"Failed to generate the sign request: {error}") + + log.info(f"Generated a sign request for {self.collaborator_name}") + except Exception as e: + log.error(f"Failed to generate the sign request: {e}") + raise e + return True + + def create_collaborator(self): + """ + Create the collaborator + Returns: + bool: True if successful, else False + """ + try: + return_code, _, error = sh.run_command( + f"fx collaborator create -n {self.collaborator_name} -d {self.data_directory_path}", + work_dir=self.workspace_path, + ) + if return_code != 0: + log.error(f"Failed to create the collaborator: {error}") + raise Exception(f"Failed to create the collaborator: {error}") + log.info( + f"Created {self.collaborator_name} with the data directory {self.data_directory_path}" + ) + except Exception as e: + log.error(f"Failed to create the collaborator: {e}") + raise e + return True + + def import_pki(self): + """ + Import and certify the CSR for the collaborator + Returns: + bool: True if successful, else False + """ + try: + zip_name = f"agg_to_col_{self.collaborator_name}_signed_cert.zip" + col_zip = os.path.join(os.getcwd(), self.workspace_path, zip_name) + return_code, output, error = sh.run_command( + f"fx collaborator certify --import {col_zip}", work_dir=self.workspace_path + ) + msg_received = [line for line in output if constants.SUCCESS_MARKER in line] + log.info(f"Message received: {msg_received}") + if return_code == 0 and len(msg_received): + log.info( + f"Successfully imported and certified the CSR for {self.collaborator_name} with zip path {col_zip}" + ) + else: + log.error( + f"Failed to import and certify the CSR for {self.collaborator_name}: {error}" + ) + + except Exception as e: + log.error(f"Failed to import and certify the CSR: {e}") + raise e + return True + + def start(self): + """ + Start the collaborator + Returns: + str: Path to the log file + """ + try: + log.info(f"Starting {self.collaborator_name}") + curr_time = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"{self.collaborator_name}_{curr_time}.log" + res_file = os.path.join(os.getcwd(), self.workspace_path, filename) + bg_file = open(res_file, "w", buffering=1) + + sh.run_command_background( + f"fx collaborator start -n {self.collaborator_name}", + work_dir=self.workspace_path, + redirect_to_file=bg_file, + check_sleep=60, + ) + log.info( + f"Started {self.collaborator_name} and tracking the logs at {os.path.join(self.workspace_path, filename)}" + ) + except Exception as e: + log.error(f"Failed to start the collaborator: {e}") + res_file.close() + raise e + return res_file diff --git a/tests/end_to_end/pytest.ini b/tests/end_to_end/pytest.ini new file mode 100644 index 0000000000..8d18441dd6 --- /dev/null +++ b/tests/end_to_end/pytest.ini @@ -0,0 +1,12 @@ +[pytest] +addopts = -ra -q -s --junitxml=results/results.xml +testpaths = test_suites +junit_family = xunit2 +results_dir = results +log_level = INFO +markers = + torch_cnn_mnist: mark a test as a torch CNN MNIST test. + keras_cnn_mnist: mark a test as a Keras CNN MNIST test. + torch_cnn_histology: mark a test as a torch CNN histology test. +asyncio_mode=auto +asyncio_default_fixture_loop_scope="function" diff --git a/tests/end_to_end/test_suites/sample_tests.py b/tests/end_to_end/test_suites/sample_tests.py new file mode 100644 index 0000000000..7c528277e8 --- /dev/null +++ b/tests/end_to_end/test_suites/sample_tests.py @@ -0,0 +1,35 @@ +# Copyright 2020-2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import pytest +import logging + +from tests.end_to_end.utils import federation_helper as fed_helper + +log = logging.getLogger(__name__) + +# ** IMPORTANT **: This is just an example on how to add a test with below pre-requisites. +# Task Runner API Test function for federation run using sample_model +# 1. Create OpenFL workspace, if not present for the model and add relevant dataset and its path in plan/data.yaml +# 2. Append the model name to ModelName enum in tests/end_to_end/utils/constants.py +# 3. Add the model name to tests/end_to_end/pytest.ini marker, if not present +# 4. Use fx_federation fixture in the test function - it will provide the federation object. +# 5. Fixture will contain - model_owner, aggregator, collaborators, model_name, workspace_path, results_dir +# 6. Setup PKI for trusted communication within the federation +# 7. Start the federation using aggregator and given no of collaborators. +# 8. Verify the completion of the federation run. + +@pytest.mark.sample_model +def test_sample_model(fx_federation): + """ + Add a proper docstring here. + """ + log.info(f"Running sample model test {fx_federation.model_name}") + # Setup PKI for trusted communication within the federation + assert fed_helper.setup_pki(fx_federation), "Failed to setup PKI" + + # Start the federation + results = fed_helper.run_federation(fx_federation) + + # Verify the completion of the federation run + assert fed_helper.verify_federation_run_completion(fx_federation, results), "Federation completion failed" diff --git a/tests/end_to_end/test_suites/task_runner_tests.py b/tests/end_to_end/test_suites/task_runner_tests.py new file mode 100644 index 0000000000..a80c583acf --- /dev/null +++ b/tests/end_to_end/test_suites/task_runner_tests.py @@ -0,0 +1,57 @@ +# Copyright 2020-2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import pytest +import logging + +from tests.end_to_end.utils import federation_helper as fed_helper + +log = logging.getLogger(__name__) + + +@pytest.mark.torch_cnn_mnist +def test_torch_cnn_mnist(fx_federation): + """ + Test for torch_cnn_mnist model. + """ + log.info("Testing torch_cnn_mnist model") + + # Setup PKI for trusted communication within the federation + assert fed_helper.setup_pki(fx_federation), "Failed to setup PKI for trusted communication" + + # Start the federation + results = fed_helper.run_federation(fx_federation) + + # Verify the completion of the federation run + assert fed_helper.verify_federation_run_completion(fx_federation, results), "Federation completion failed" + + +@pytest.mark.keras_cnn_mnist +def test_keras_cnn_mnist(fx_federation): + log.info("Testing keras_cnn_mnist model") + + # Setup PKI for trusted communication within the federation + assert fed_helper.setup_pki(fx_federation), "Failed to setup PKI for trusted communication" + + # Start the federation + results = fed_helper.run_federation(fx_federation) + + # Verify the completion of the federation run + assert fed_helper.verify_federation_run_completion(fx_federation, results), "Federation completion failed" + + +@pytest.mark.torch_cnn_histology +def test_torch_cnn_histology(fx_federation): + """ + Test for torch_cnn_histology model + """ + log.info("Testing torch_cnn_histology model") + + # Setup PKI for trusted communication within the federation + assert fed_helper.setup_pki(fx_federation), "Failed to setup PKI for trusted communication" + + # Start the federation + results = fed_helper.run_federation(fx_federation) + + # Verify the completion of the federation run + assert fed_helper.verify_federation_run_completion(fx_federation, results), "Federation completion failed" diff --git a/tests/end_to_end/utils/conftest_helper.py b/tests/end_to_end/utils/conftest_helper.py new file mode 100644 index 0000000000..490a3316db --- /dev/null +++ b/tests/end_to_end/utils/conftest_helper.py @@ -0,0 +1,36 @@ +# Copyright 2020-2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import argparse +import sys +import logging + +log = logging.getLogger(__name__) + + +def parse_arguments(): + """ + Parse command line arguments to provide the required parameters for running the tests. + + Returns: + argparse.Namespace: Parsed command line arguments with the following attributes: + - results_dir (str, optional): Directory to store the results + - num_collaborators (int, default=2): Number of collaborators + - num_rounds (int, default=5): Number of rounds to train + - model_name (str, default="torch_cnn_mnist"): Model name + + Raises: + SystemExit: If the required arguments are not provided or if any argument parsing error occurs. + """ + try: + parser = argparse.ArgumentParser(description="Provide the required arguments to run the tests") + parser.add_argument("--results_dir", type=str, required=False, help="Directory to store the results") + parser.add_argument("--num_collaborators", type=int, default=2, help="Number of collaborators") + parser.add_argument("--num_rounds", type=int, default=5, help="Number of rounds to train") + parser.add_argument("--model_name", type=str, default="torch_cnn_mnist", help="Model name") + args = parser.parse_known_args()[0] + return args + + except Exception as e: + log.error(f"Failed to parse arguments: {e}") + sys.exit(1) diff --git a/tests/end_to_end/utils/constants.py b/tests/end_to_end/utils/constants.py new file mode 100644 index 0000000000..0b724c7ced --- /dev/null +++ b/tests/end_to_end/utils/constants.py @@ -0,0 +1,21 @@ +# Copyright 2020-2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +from enum import Enum + +# Define the model names. This is a non exhaustive list of models that can be used in the tests +class ModelName(Enum): + """ + Enum class to define the model names. + """ + # IMP - The model name must be same (and in uppercase) as the model value. + # This is used to identify the model in the tests. + TORCH_CNN_MNIST = "torch_cnn_mnist" + KERAS_CNN_MNIST = "keras_cnn_mnist" + TORCH_CNN_HISTOLOGY = "torch_cnn_histology" + +NUM_COLLABORATORS = 2 +NUM_ROUNDS = 5 +WORKSPACE_NAME = "my_federation" +DEFAULT_MODEL_NAME = "torch_cnn_mnist" +SUCCESS_MARKER = "✔️ OK" diff --git a/tests/end_to_end/utils/federation_helper.py b/tests/end_to_end/utils/federation_helper.py new file mode 100644 index 0000000000..a4addbc49f --- /dev/null +++ b/tests/end_to_end/utils/federation_helper.py @@ -0,0 +1,134 @@ +# Copyright 2020-2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import time +import concurrent.futures +import logging + +from tests.end_to_end.utils.constants import SUCCESS_MARKER + +log = logging.getLogger(__name__) + + +def setup_pki(fed_obj): + """ + Setup PKI for trusted communication within the federation + + Args: + fed_obj (object): Federation fixture object + Returns: + bool: True if successful, else False + """ + success = False + # Aggregator and model owner operations + try: + log.info(f"Performing operations for {fed_obj.aggregator.name}") + fed_obj.aggregator.generate_sign_request() + fed_obj.model_owner.certify_aggregator(fed_obj.aggregator.agg_domain_name) + except Exception as e: + log.error(f"Failed to perform aggregator operations: {e}") + raise e + + # Collaborator and model owner operations + for collaborator in fed_obj.collaborators: + try: + log.info(f"Performing operations for {collaborator.collaborator_name}") + collaborator.create_collaborator() + collaborator.generate_sign_request() + # Below step will add collaborator entries in cols.yaml file. + fed_obj.model_owner.certify_collaborator(collaborator.collaborator_name) + collaborator.import_pki() + except Exception as e: + log.error(f"Failed to perform collaborator operations: {e}") + raise e + success = True + + log.info("CSR operations completed successfully for all participants") + return success + + +def run_federation(fed_obj): + """ + Start the federation + Args: + fed_obj (object): Federation fixture object + Returns: + list: List of response files for all the participants + """ + executor = concurrent.futures.ThreadPoolExecutor() + # As the collaborators will wait for aggregator to start, we need to start them in parallel. + futures = [ + executor.submit( + participant.start + ) + for participant in fed_obj.collaborators + [fed_obj.aggregator] + ] + + # Result will contain response files for all the participants. + results = [f.result() for f in futures] + return results + + +def verify_federation_run_completion(fed_obj, results): + """ + Verify the completion of the process for all the participants + Args: + fed_obj (object): Federation fixture object + results (list): List of results + Returns: + list: List of response (True or False) for all the participants + """ + log.info("Verifying the completion of the process for all the participants") + # Start the collaborators and aggregator + executor = concurrent.futures.ThreadPoolExecutor() + # As the collaborators will wait for aggregator to start, we need to start them in parallel. + futures = [ + executor.submit( + _verify_completion_for_participant, + participant, + results[i] + ) + for i, participant in enumerate(fed_obj.collaborators + [fed_obj.aggregator]) + ] + + # Result will contain a list of tuple of replica and operator objects. + results = [f.result() for f in futures] + log.info(f"Results: {results}") + + # If any of the participant failed, return False, else return True + return all(results) + + +def _verify_completion_for_participant(participant, result_file): + """ + Verify the completion of the process for the participant + Args: + participant (object): Participant object + result_file (str): Result file + Returns: + bool: True if successful, else False + """ + # Wait for the successful output message to appear in the log till timeout + timeout = 900 # in seconds + log.info(f"Printing the last line of the log file for {participant.name} to track the progress") + with open(result_file, 'r') as file: + content = file.read() + start_time = time.time() + while ( + SUCCESS_MARKER not in content and time.time() - start_time < timeout + ): + with open(result_file, 'r') as file: + content = file.read() + # Print last 2 lines of the log file on screen to track the progress + log.info(f"{participant.name}: {content.splitlines()[-1:]}") + if SUCCESS_MARKER in content: + break + log.info(f"Process is yet to complete for {participant.name}") + time.sleep(45) + + if SUCCESS_MARKER not in content: + log.error(f"Process failed/is incomplete for {participant.name} after timeout of {timeout} seconds") + return False + else: + log.info(f"Process completed for {participant.name} in {time.time() - start_time} seconds") + return True diff --git a/tests/end_to_end/utils/logger.py b/tests/end_to_end/utils/logger.py new file mode 100644 index 0000000000..b3e9d95311 --- /dev/null +++ b/tests/end_to_end/utils/logger.py @@ -0,0 +1,38 @@ +# Copyright 2020-2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import logging + +# Get the logger instance configured in conftest.py +logger = logging.getLogger() + + +def configure_logging(log_file, log_level): + """ + Configures logging for the application. + + This function sets up logging to a specified file and the console with the given log level. + It formats the log messages to include the timestamp, logger name, log level, filename, + function name, and the actual log message. + + Args: + log_file (str): Path to the log file. + log_level (int): Logging level (e.g., logging.DEBUG, logging.INFO). + + Raises: + OSError: If there is an issue with creating the log file handler. + """ + formatter = logging.Formatter( + "\n%(asctime)s - %(levelname)s: [%(filename)s - %(funcName)s]: %(message)s", datefmt="%Y-%m-%d %H:%M:%S" + ) + handler = logging.FileHandler(log_file) + handler.setFormatter(formatter) + handler.setLevel(log_level) + + console_handler = logging.StreamHandler() + console_handler.setFormatter(formatter) + console_handler.setLevel(log_level) + logger = logging.getLogger() + logger.setLevel(log_level) + logger.addHandler(handler) + logger.addHandler(console_handler) diff --git a/tests/end_to_end/utils/subprocess_helper.py b/tests/end_to_end/utils/subprocess_helper.py new file mode 100644 index 0000000000..ec09412762 --- /dev/null +++ b/tests/end_to_end/utils/subprocess_helper.py @@ -0,0 +1,127 @@ +# Copyright 2020-2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import subprocess +import time +import traceback +import logging + +log = logging.getLogger(__name__) + + +def run_command_background( + cmd, return_error=False, print_stdout=False, work_dir=None, redirect_to_file=None, check_sleep=1 +): + """Execute a command and let it run in background. + + Args: + cmd (Union[str, list]): Command to execute. + Can be a shell type string or a list of command and args. + e.g. ['ps', '-ef'], ['/bin/bash/', script.sh], './script.sh' + return_error: Whether to return error message. This has no effect. + print_stdout: If True and the process completes immediately, print the stdout. + This is obsolete. Will always print debug output and errors. + Output will be truncated to 10 lines. + work_dir: Directory from which to run the command. Current directory if None. + redirect_to_file: The file descriptor to which the STDERR and STDOUT will be written. + check_sleep: Time in seconds to sleep before polling to make sure + the background process is still running. + + Returns: + Popen object of the subprocess. None, if the command completed immediately. + """ + if isinstance(cmd, list): + shell = False + else: + shell = True + + if redirect_to_file: + output_redirect = redirect_to_file + error_redirect = subprocess.STDOUT + else: + output_redirect = subprocess.PIPE + error_redirect = subprocess.PIPE + process = subprocess.Popen( + cmd, stdout=output_redirect, stderr=error_redirect, shell=shell, text=True, cwd=work_dir + ) + time.sleep(check_sleep) + return_code = process.poll() + if return_code is None: + return process + elif return_code != 0: + if redirect_to_file: + log.info( + "The background process has been writing STDERR and STDOUT to a file passed in as 'redirect_to_file' arg" + ) + else: + error = process.stderr.read().rstrip("\n") + log.warning(f"Error is: {error}") + log.error(f"Error Traceback: {traceback.print_exc()}") + raise subprocess.CalledProcessError(returncode=return_code, cmd=cmd) + else: + log.warning("Process for Command completed instantly.") + if redirect_to_file: + log.info( + "The background process has been writing STDERR and STDOUT to a file passed in as 'redirect_to_file' arg" + ) + else: + output = process.stdout.read().rstrip("\n").split("\n") + if print_stdout and output is not None: + log.info(f"Command to run - {cmd} output - {output}") + return None + + +def run_command( + cmd, return_error=True, print_stdout=False, work_dir=None, timeout=None, check=True +): + """ + Execute the command using subprocess and log the output to logger. + + Args: + cmd (str or list): The command to run. + return_error (bool): Whether to return errors or raise them. + print_stdout (bool): Whether to print the standard output. + work_dir (str): The working directory for the command. + timeout (int): The timeout in seconds for the command to complete. + check (bool): Whether to check for errors after command execution. + + Returns: + tuple: (return_code, output, error) + """ + if isinstance(cmd, list): + shell = False + else: + shell = True + + try: + result = subprocess.run( + cmd, capture_output=True, shell=shell, text=True, cwd=work_dir, check=check, timeout=timeout + ) + except subprocess.CalledProcessError as e: + log.error(f"Command '{cmd}' failed with return code {e.returncode}") + log.error(f"Error output: {e.stderr}") + if not return_error: + raise + return e.returncode, [], [e.stderr] + except Exception as e: + log.error(f"Failed to execute command '{cmd}': {str(e)}") + log.error(f"Error Traceback: {traceback.format_exc()}") + if not return_error: + raise + return -1, [], [str(e)] + + output = result.stdout.splitlines() + error = result.stderr.splitlines() + + if result.returncode == 0: + log.info(f"Successfully ran command: {cmd}") + if print_stdout: + log.info(f"Command output: {result.stdout}") + else: + log.error(f"Subprocess command '{cmd}' returned non-zero return_code [{result.returncode}]:") + log.error(f"stderr: {result.stderr}") + log.error(f"stdout: {result.stdout}") + if not return_error: + raise subprocess.CalledProcessError(returncode=result.returncode, cmd=cmd, stderr=result.stderr) + + return result.returncode, output, error diff --git a/tests/end_to_end/utils/xml_helper.py b/tests/end_to_end/utils/xml_helper.py new file mode 100644 index 0000000000..b3812acc3c --- /dev/null +++ b/tests/end_to_end/utils/xml_helper.py @@ -0,0 +1,75 @@ +# Copyright 2020-2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import xml.etree.ElementTree as ET +from lxml import etree +import os + +# Initialize the XML parser +parser = etree.XMLParser(recover=True, encoding='utf-8') +tree = ET.parse("results/results.xml", parser=parser) + +# Get the root element +testsuites = tree.getroot() + + +def get_test_status(result): + """ + Get the test status/verdict + Args + result: the result object to check` + Returns + status of the test status + """ + status = "FAILED" + if "failure" in result.tag or "error" in result.tag: + # If the result has a tag "failure", set status as "FAIL" + status = "FAILED" + elif "skipped" in result.tag: + # If the result has a tag "skipped", set status as "SKIPPED" + status = "SKIPPED" + else: + status = "PASSED" + return status + + +def get_testcase_result(): + """ + Get the test case results from the XML file + """ + database_list = [] + status = None + # Iterate over each testsuite in testsuites + for testsuite in testsuites: + # Populate testcase details in a dictionary + for testcase in testsuite: + database_dict = {} + if testcase.attrib.get("name"): + database_dict["name"] = testcase.attrib.get("name") + database_dict["time"] = testcase.attrib.get("time") + + # Successful test won't have any result/subtag + if len(testcase) == 0: + database_dict["result"] = "PASSED" + + # Iterate over each result in testsuite + for result in testcase: + status = get_test_status(result) + database_dict["result"] = status + + # Append the dictionary to database_list + database_list.append(database_dict) + status = None + + return database_list + + +result = get_testcase_result() + +# Write the results to GitHub step summary +with open(os.getenv('GITHUB_STEP_SUMMARY'), 'a') as fh: + # DO NOT change the print statements + print("| Name | Time (in seconds) | Result |", file=fh) + print("| ------------- | ------------- | ------------- |", file=fh) + for item in result: + print(f"| {item['name']} | {item['time']} | {item['result']} |", file=fh)