From d8f0aaf3cd3ec0745fddecbd85c16441e8072710 Mon Sep 17 00:00:00 2001 From: Noopur Date: Mon, 10 Feb 2025 10:13:58 +0530 Subject: [PATCH] Task Runner E2E - Added Resiliency Tests (#1347) * Task Runner E2E - Added Resiliency Tests Signed-off-by: noopur * Correction in collaborator_memory_usage_file assignment Signed-off-by: noopur * Enabled option to select a job in DWS workflow Signed-off-by: noopur * Temp added resiliency to existing wf for testing Signed-off-by: noopur * Temp added resiliency to existing wf for testing Signed-off-by: noopur * Some logs moved from info to debug Signed-off-by: noopur * Skip workspace.tar from artifacts due to its size in GBs Signed-off-by: noopur * Removed resiliency tests from existing workflows Signed-off-by: noopur * Code format check Signed-off-by: noopur * Assert increase in current round after restart + wait Signed-off-by: noopur * Reverted mandatory check for model_name from conftest.py Signed-off-by: noopur * All review comments incorporated Signed-off-by: noopur * Missed adding model_name back in summary help Signed-off-by: noopur * Missed adding model_name back in summary help Signed-off-by: noopur * Set best score as 'Not Found' if tensor.db file is not present Signed-off-by: noopur * Specific error msg when db fetch fails Signed-off-by: noopur * Handle FedEval case in docker Signed-off-by: noopur * Check total rounds during round increment check Signed-off-by: noopur * Handle multiple / in the model names Signed-off-by: noopur * Skip dockerized resiliency Signed-off-by: noopur --------- Signed-off-by: noopur Co-authored-by: Payal Chaurasiya --- .github/actions/tr_post_test_run/action.yml | 6 +- .github/workflows/task_runner_basic_e2e.yml | 34 +- .../task_runner_dockerized_ws_e2e.yml | 55 ++- .../workflows/task_runner_e2e_resiliency.yml | 159 +++++++ .../workflows/task_runner_fedeval_dws_e2e.yml | 18 +- .github/workflows/task_runner_fedeval_e2e.yml | 18 +- .../workflows/test_task_runner_basic_e2e.yml | 416 ++++++++++++++++++ .../test_task_runner_dockerized_ws_e2e.yml | 347 +++++++++++++++ tests/end_to_end/conftest.py | 18 - tests/end_to_end/models/aggregator.py | 17 +- tests/end_to_end/models/collaborator.py | 33 +- tests/end_to_end/models/model_owner.py | 2 +- .../test_suites/memory_logs_tests.py | 10 +- .../test_suites/task_runner_tests.py | 16 +- .../test_suites/tr_resiliency_tests.py | 124 ++++++ .../test_suites/tr_with_fedeval_tests.py | 26 +- tests/end_to_end/utils/conftest_helper.py | 12 +- tests/end_to_end/utils/constants.py | 6 + tests/end_to_end/utils/db_helper.py | 48 +- tests/end_to_end/utils/docker_helper.py | 74 +++- tests/end_to_end/utils/exceptions.py | 20 + tests/end_to_end/utils/federation_helper.py | 182 ++++---- tests/end_to_end/utils/interruption_helper.py | 102 +++++ tests/end_to_end/utils/summary_helper.py | 41 +- tests/end_to_end/utils/tr_workspace.py | 19 +- 25 files changed, 1497 insertions(+), 306 deletions(-) create mode 100644 .github/workflows/task_runner_e2e_resiliency.yml create mode 100644 .github/workflows/test_task_runner_basic_e2e.yml create mode 100644 .github/workflows/test_task_runner_dockerized_ws_e2e.yml create mode 100644 tests/end_to_end/test_suites/tr_resiliency_tests.py create mode 100644 tests/end_to_end/utils/interruption_helper.py diff --git a/.github/actions/tr_post_test_run/action.yml b/.github/actions/tr_post_test_run/action.yml index c7ce11a8e1..fe1bdc1cf0 100644 --- a/.github/actions/tr_post_test_run/action.yml +++ b/.github/actions/tr_post_test_run/action.yml @@ -20,14 +20,14 @@ runs: echo "Test summary printed" shell: bash - - name: Create Tar (exclude cert and data folders) + - name: Create Tar (exclude folders - cert/data/__pycache__, files - tensor.db/workspace.tar) id: tar_files if: ${{ always() }} run: | - tar -cvf result.tar --exclude="cert" --exclude="data" --exclude="__pycache__" $HOME/results + tar -cvf result.tar --exclude="cert" --exclude="data" --exclude="__pycache__" --exclude="tensor.db" --exclude="workspace.tar" $HOME/results # Model name might contain forward slashes, convert them to underscore. tmp=${{ env.MODEL_NAME }} - echo "MODEL_NAME_MODIFIED=${tmp/\//_}" >> $GITHUB_ENV + echo "MODEL_NAME_MODIFIED=${tmp//\//_}" >> $GITHUB_ENV shell: bash - name: Upload Artifacts diff --git a/.github/workflows/task_runner_basic_e2e.yml b/.github/workflows/task_runner_basic_e2e.yml index 5430c5695c..438c9b5681 100644 --- a/.github/workflows/task_runner_basic_e2e.yml +++ b/.github/workflows/task_runner_basic_e2e.yml @@ -6,6 +6,9 @@ name: Task_Runner_E2E # Please do not modify the name as it is used in the comp on: schedule: - cron: "0 0 * * *" # Run every day at midnight + push: + branches: + - develop workflow_dispatch: inputs: num_rounds: @@ -44,8 +47,8 @@ on: options: - all - test_with_tls - - test_with_non_tls - - test_with_no_client_auth + - test_without_tls + - test_without_client_auth - test_memory_logs required: false @@ -65,7 +68,8 @@ jobs: input_selection: if: | (github.event_name == 'schedule' && github.repository_owner == 'securefederatedai') || - (github.event_name == 'workflow_dispatch') + (github.event_name == 'workflow_dispatch') || + (github.event.pull_request.draft == false && contains(github.event.pull_request.labels.*.name, 'task_runner_e2e')) name: Input value selection runs-on: ubuntu-22.04 outputs: @@ -119,7 +123,7 @@ jobs: fi test_with_tls: - name: Test with TLS + name: With TLS needs: input_selection if: needs.input_selection.outputs.selected_jobs == 'all' || needs.input_selection.outputs.selected_jobs == 'test_with_tls' runs-on: ubuntu-22.04 @@ -159,12 +163,12 @@ jobs: uses: ./.github/actions/tr_post_test_run if: ${{ always() }} with: - test_type: "tr_tls" + test_type: "With_TLS" - test_with_non_tls: - name: Test without TLS + test_without_tls: + name: Without TLS needs: input_selection - if: needs.input_selection.outputs.selected_jobs == 'all' || needs.input_selection.outputs.selected_jobs == 'test_with_non_tls' + if: needs.input_selection.outputs.selected_jobs == 'all' || needs.input_selection.outputs.selected_jobs == 'test_without_tls' runs-on: ubuntu-22.04 timeout-minutes: 30 strategy: @@ -202,12 +206,12 @@ jobs: uses: ./.github/actions/tr_post_test_run if: ${{ always() }} with: - test_type: "tr_non_tls" + test_type: "Without_TLS" - test_with_no_client_auth: - name: Test without client auth + test_without_client_auth: + name: Without Client Auth needs: input_selection - if: needs.input_selection.outputs.selected_jobs == 'all' || needs.input_selection.outputs.selected_jobs == 'test_with_no_client_auth' + if: needs.input_selection.outputs.selected_jobs == 'all' || needs.input_selection.outputs.selected_jobs == 'test_without_client_auth' runs-on: ubuntu-22.04 timeout-minutes: 30 strategy: @@ -245,10 +249,10 @@ jobs: uses: ./.github/actions/tr_post_test_run if: ${{ always() }} with: - test_type: 'tr_no_client_auth' + test_type: 'Without_Client_Auth' test_memory_logs: - name: Test memory usage + name: With Memory Logs needs: input_selection if: needs.input_selection.outputs.selected_jobs == 'all' || needs.input_selection.outputs.selected_jobs == 'test_memory_logs' runs-on: ubuntu-22.04 @@ -289,4 +293,4 @@ jobs: uses: ./.github/actions/tr_post_test_run if: ${{ always() }} with: - test_type: "tr_tls_memory_logs" + test_type: "With_Memory_Logs" diff --git a/.github/workflows/task_runner_dockerized_ws_e2e.yml b/.github/workflows/task_runner_dockerized_ws_e2e.yml index 23febe3e64..01a66cff72 100644 --- a/.github/workflows/task_runner_dockerized_ws_e2e.yml +++ b/.github/workflows/task_runner_dockerized_ws_e2e.yml @@ -16,6 +16,17 @@ on: required: false default: "2" type: string + jobs_to_run: + description: "Jobs to run" + type: choice + default: "all" + options: + - all + - test_with_tls + - test_without_tls + - test_without_client_auth + - test_memory_logs + required: false permissions: contents: read @@ -24,10 +35,24 @@ permissions: env: NUM_ROUNDS: ${{ inputs.num_rounds || '5' }} NUM_COLLABORATORS: ${{ inputs.num_collaborators || '2' }} + JOBS_TO_RUN: ${{ inputs.jobs_to_run || 'all' }} jobs: - test_with_tls_dockerized_ws: - name: tr_tls_dockerized_ws + input_selection: + name: Input value selection + runs-on: ubuntu-22.04 + outputs: + selected_jobs: ${{ steps.input_selection.outputs.jobs_to_run }} + steps: + - name: Job to select input values + id: input_selection + run: | + echo "jobs_to_run=${{ env.JOBS_TO_RUN }}" >> "$GITHUB_OUTPUT" + + test_with_tls: + name: With TLS + needs: input_selection + if: needs.input_selection.outputs.selected_jobs == 'all' || needs.input_selection.outputs.selected_jobs == 'test_with_tls' runs-on: ubuntu-22.04 timeout-minutes: 15 strategy: @@ -65,10 +90,12 @@ jobs: uses: ./.github/actions/tr_post_test_run if: ${{ always() }} with: - test_type: "tr_tls_dockerized_ws" + test_type: "With_TLS" - test_with_non_tls_dockerized_ws: - name: tr_non_tls_dockerized_ws + test_without_tls: + name: Without TLS + needs: input_selection + if: needs.input_selection.outputs.selected_jobs == 'all' || needs.input_selection.outputs.selected_jobs == 'test_without_tls' runs-on: ubuntu-22.04 timeout-minutes: 15 strategy: @@ -106,10 +133,12 @@ jobs: uses: ./.github/actions/tr_post_test_run if: ${{ always() }} with: - test_type: "tr_non_tls_dockerized_ws" + test_type: "Without_TLS" - test_with_no_client_auth_dockerized_ws: - name: tr_no_client_auth_dockerized_ws + test_without_client_auth: + name: Without Client Auth + needs: input_selection + if: needs.input_selection.outputs.selected_jobs == 'all' || needs.input_selection.outputs.selected_jobs == 'test_without_client_auth' runs-on: ubuntu-22.04 timeout-minutes: 15 strategy: @@ -147,10 +176,12 @@ jobs: uses: ./.github/actions/tr_post_test_run if: ${{ always() }} with: - test_type: "tr_no_client_auth_dockerized_ws" + test_type: "Without_Client_Auth" - test_memory_logs_dockerized_ws: - name: tr_tls_memory_logs_dockerized_ws + test_memory_logs: + name: With Memory Logs + needs: input_selection + if: needs.input_selection.outputs.selected_jobs == 'all' || needs.input_selection.outputs.selected_jobs == 'test_memory_logs' runs-on: ubuntu-22.04 timeout-minutes: 15 strategy: @@ -189,4 +220,4 @@ jobs: uses: ./.github/actions/tr_post_test_run if: ${{ always() }} with: - test_type: "tr_tls_memory_logs_dockerized_ws" + test_type: "With_Memory_Logs" diff --git a/.github/workflows/task_runner_e2e_resiliency.yml b/.github/workflows/task_runner_e2e_resiliency.yml new file mode 100644 index 0000000000..1c538a6298 --- /dev/null +++ b/.github/workflows/task_runner_e2e_resiliency.yml @@ -0,0 +1,159 @@ +--- +# Task Runner E2E tests for resiliency. It includes both - native and dockerized environments. + +name: Task_Runner_E2E_Resiliency # Please do not modify the name as it is used in the composite action + +on: + schedule: + - cron: "0 5 * * *" # Run every day at 5 am UTC + push: + branches: + - develop + workflow_dispatch: + inputs: + num_rounds: + description: "Number of rounds to train" + required: false + default: "50" + type: string + num_collaborators: + description: "Number of collaborators" + required: false + default: "2" + type: string + model_name: + description: "Model name" + required: false + default: "all" + type: choice + options: + - all + - torch/mnist + - keras/mnist + python_version: + description: "Python version" + required: false + default: "3.10" + type: choice + options: + - "3.10" + - "3.11" + - "3.12" + +permissions: + contents: read + +# Environment variables common for all the jobs +# DO NOT use double quotes for the values of the environment variables +env: + NUM_ROUNDS: ${{ inputs.num_rounds || 50 }} + NUM_COLLABORATORS: ${{ inputs.num_collaborators || 2 }} + MODEL_NAME: ${{ inputs.model_name || 'all' }} + PYTHON_VERSION: ${{ inputs.python_version || '3.10' }} + +jobs: + input_selection: + if: | + (github.event_name == 'schedule' && github.repository_owner == 'securefederatedai') || + (github.event_name == 'workflow_dispatch') || + (github.event.pull_request.draft == false && contains(github.event.pull_request.labels.*.name, 'task_runner_e2e')) + name: Input value selection + runs-on: ubuntu-22.04 + outputs: + # Output all the variables related to models and python versions to be used in the matrix strategy + # for different jobs, however their usage depends on the selected job. + selected_models_for_tls: ${{ steps.input_selection.outputs.models_for_tls }} + steps: + - name: Job to select input values + id: input_selection + run: | + if [ "${{ env.MODEL_NAME }}" == "all" ]; then + echo "models_for_tls=[\"torch/mnist\", \"keras/mnist\"]" >> "$GITHUB_OUTPUT" + else + echo "models_for_tls=[\"${{env.MODEL_NAME}}\"]" >> "$GITHUB_OUTPUT" + fi + + resiliency_in_native: + name: With TLS (Native) + needs: input_selection + runs-on: ubuntu-22.04 + timeout-minutes: 30 + strategy: + matrix: + model_name: ${{ fromJson(needs.input_selection.outputs.selected_models_for_tls) }} + fail-fast: false # do not immediately fail if one of the combinations fail + + env: + MODEL_NAME: ${{ matrix.model_name }} + PYTHON_VERSION: ${{ matrix.python_version }} + + steps: + - name: Checkout OpenFL repository + id: checkout_openfl + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} + + - name: Run Task Runner E2E tests with TLS + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/tr_resiliency_tests.py \ + -m task_runner_basic --model_name ${{ env.MODEL_NAME }} \ + --num_collaborators ${{ env.NUM_COLLABORATORS }} --num_rounds ${{ env.NUM_ROUNDS }} + echo "Task runner end to end test run completed" + + - name: Post test run + uses: ./.github/actions/tr_post_test_run + if: ${{ always() }} + with: + test_type: "Resiliency_Native" + + # TODO - Once we have GitHub runners with higher configurations, we can enable this job. + # resiliency_in_dws: + # name: With TLS (Dockerized) + # needs: input_selection + # runs-on: ubuntu-22.04 + # timeout-minutes: 30 + # strategy: + # matrix: + # # Dockerized WS for other models require higher config runners. + # # Once the issue is fixed, we can enable the tests for other models as well. + # model_name: ["keras/mnist"] + # fail-fast: false # do not immediately fail if one of the combinations fail + + # env: + # MODEL_NAME: ${{ matrix.model_name }} + # PYTHON_VERSION: ${{ matrix.python_version }} + + # steps: + # - name: Checkout OpenFL repository + # id: checkout_openfl + # uses: actions/checkout@v4.1.1 + # with: + # fetch-depth: 2 # needed for detecting changes + # submodules: "true" + # token: ${{ secrets.GITHUB_TOKEN }} + + # - name: Pre test run + # uses: ./.github/actions/tr_pre_test_run + # if: ${{ always() }} + + # - name: Run Task Runner E2E tests with TLS + # id: run_tests + # run: | + # python -m pytest -s tests/end_to_end/test_suites/tr_resiliency_tests.py \ + # -m task_runner_dockerized_ws --model_name ${{ env.MODEL_NAME }} \ + # --num_collaborators ${{ env.NUM_COLLABORATORS }} --num_rounds ${{ env.NUM_ROUNDS }} + # echo "Task runner end to end test run completed" + + # - name: Post test run + # uses: ./.github/actions/tr_post_test_run + # if: ${{ always() }} + # with: + # test_type: "Resiliency_Dockerized" diff --git a/.github/workflows/task_runner_fedeval_dws_e2e.yml b/.github/workflows/task_runner_fedeval_dws_e2e.yml index f0d62ae278..0c6e4f429f 100644 --- a/.github/workflows/task_runner_fedeval_dws_e2e.yml +++ b/.github/workflows/task_runner_fedeval_dws_e2e.yml @@ -51,8 +51,8 @@ jobs: run: | echo "jobs_to_run=${{ env.JOBS_TO_RUN }}" - test_with_tls_dockerized_ws: - name: tr_tls_dockerized_ws + test_with_tls: + name: With TLS needs: input_selection runs-on: ubuntu-22.04 timeout-minutes: 15 @@ -92,10 +92,10 @@ jobs: uses: ./.github/actions/tr_post_test_run if: ${{ always() }} with: - test_type: "tr_tls_dockerized_ws" + test_type: "With_TLS" - test_with_non_tls_dockerized_ws: - name: tr_non_tls_dockerized_ws + test_without_tls: + name: Without TLS needs: input_selection runs-on: ubuntu-22.04 timeout-minutes: 15 @@ -135,10 +135,10 @@ jobs: uses: ./.github/actions/tr_post_test_run if: ${{ always() }} with: - test_type: "tr_non_tls_dockerized_ws" + test_type: "Without_TLS" - test_with_no_client_auth_dockerized_ws: - name: tr_no_client_auth_dockerized_ws + test_without_client_auth: + name: Without Client Auth needs: input_selection runs-on: ubuntu-22.04 timeout-minutes: 15 @@ -178,4 +178,4 @@ jobs: uses: ./.github/actions/tr_post_test_run if: ${{ always() }} with: - test_type: "tr_no_client_auth_dockerized_ws" \ No newline at end of file + test_type: "Without_Client_Auth" diff --git a/.github/workflows/task_runner_fedeval_e2e.yml b/.github/workflows/task_runner_fedeval_e2e.yml index f3c08e79ad..36ea717078 100644 --- a/.github/workflows/task_runner_fedeval_e2e.yml +++ b/.github/workflows/task_runner_fedeval_e2e.yml @@ -5,7 +5,7 @@ name: Task_Runner_FedEval_E2E # Please do not modify the name as it is used in on: schedule: - - cron: "0 0 * * *" # Run every day at midnight + - cron: "0 7 * * *" # Run every day at 7 am UTC workflow_dispatch: inputs: num_rounds: @@ -29,7 +29,7 @@ env: jobs: test_with_tls: - name: tr_tls + name: With TLS if: | (github.event_name == 'schedule' && github.repository_owner == 'securefederatedai') || (github.event_name == 'workflow_dispatch') @@ -72,10 +72,10 @@ jobs: uses: ./.github/actions/tr_post_test_run if: ${{ always() }} with: - test_type: "tr_tls" + test_type: "With_TLS" - test_with_non_tls: - name: tr_non_tls + test_without_tls: + name: Without TLS if: | (github.event_name == 'schedule' && github.repository_owner == 'securefederatedai') || (github.event_name == 'workflow_dispatch') @@ -118,10 +118,10 @@ jobs: uses: ./.github/actions/tr_post_test_run if: ${{ always() }} with: - test_type: "tr_non_tls" + test_type: "Without_TLS" - test_with_no_client_auth: - name: tr_no_client_auth + test_without_client_auth: + name: Without Client Auth if: | (github.event_name == 'schedule' && github.repository_owner == 'securefederatedai') || (github.event_name == 'workflow_dispatch') @@ -164,4 +164,4 @@ jobs: uses: ./.github/actions/tr_post_test_run if: ${{ always() }} with: - test_type: "tr_no_client_auth" + test_type: "Without_Client_Auth" diff --git a/.github/workflows/test_task_runner_basic_e2e.yml b/.github/workflows/test_task_runner_basic_e2e.yml new file mode 100644 index 0000000000..96167530ed --- /dev/null +++ b/.github/workflows/test_task_runner_basic_e2e.yml @@ -0,0 +1,416 @@ +--- +# Task Runner E2E tests for bare metal approach + +name: Task_Runner_E2E # Please do not modify the name as it is used in the composite action + +on: + schedule: + - cron: "0 0 * * *" # Run every day at midnight + workflow_dispatch: + inputs: + num_rounds: + description: "Number of rounds to train" + required: false + default: "5" + type: string + num_collaborators: + description: "Number of collaborators" + required: false + default: "2" + type: string + model_name: + description: "Model name" + required: false + default: "all" + type: choice + options: + - all + - torch/mnist + - keras/mnist + python_version: + description: "Python version" + required: false + default: "all" + type: choice + options: + - all + - "3.10" + - "3.11" + - "3.12" + jobs_to_run: + description: "Jobs to run" + type: choice + default: "all" + options: + - all + - test_with_tls + - test_with_non_tls + - test_with_no_client_auth + - test_memory_logs + required: false + +permissions: + contents: read + +# Environment variables common for all the jobs +# DO NOT use double quotes for the values of the environment variables +env: + NUM_ROUNDS: ${{ inputs.num_rounds || 5 }} + NUM_COLLABORATORS: ${{ inputs.num_collaborators || 2 }} + MODEL_NAME: ${{ inputs.model_name || 'all' }} + PYTHON_VERSION: ${{ inputs.python_version || 'all' }} + JOBS_TO_RUN: ${{ inputs.jobs_to_run || 'all' }} + +jobs: + input_selection: + if: | + (github.event_name == 'schedule' && github.repository_owner == 'securefederatedai') || + (github.event_name == 'workflow_dispatch') + name: Input value selection + runs-on: ubuntu-22.04 + outputs: + # Output all the variables related to models and python versions to be used in the matrix strategy + # for different jobs, however their usage depends on the selected job. + selected_jobs: ${{ steps.input_selection.outputs.jobs_to_run }} + selected_models_for_tls: ${{ steps.input_selection.outputs.models_for_tls }} + selected_python_for_tls: ${{ steps.input_selection.outputs.python_for_tls }} + selected_models_for_non_tls: ${{ steps.input_selection.outputs.models_for_non_tls }} + selected_models_for_no_client_auth: ${{ steps.input_selection.outputs.models_for_no_client_auth }} + selected_models_for_memory_logs: ${{ steps.input_selection.outputs.models_for_memory_logs }} + selected_python_for_non_tls: ${{ steps.input_selection.outputs.python_for_non_tls }} + selected_python_for_no_client_auth: ${{ steps.input_selection.outputs.python_for_no_client_auth }} + selected_python_for_memory_logs: ${{ steps.input_selection.outputs.python_for_memory_logs }} + steps: + - name: Job to select input values + id: input_selection + run: | + # --------------------------------------------------------------- + # Models like XGBoost (xgb_higgs) and torch/histology require runners with higher memory and CPU to run. + # Thus these models are excluded from the matrix for now. + # Default combination if no input is provided (i.e. 'all' is selected). + # * TLS - models [torch/mnist, keras/mnist] and python versions [3.10, 3.11, 3.12] + # * Non-TLS - models [torch/mnist] and python version [3.10] + # * No client auth - models [keras/mnist] and python version [3.10] + # * Memory logs - models [torch/mnist] and python version [3.10] + # --------------------------------------------------------------- + echo "jobs_to_run=${{ env.JOBS_TO_RUN }}" >> "$GITHUB_OUTPUT" + + if [ "${{ env.MODEL_NAME }}" == "all" ]; then + echo "models_for_tls=[\"torch/mnist\", \"keras/mnist\"]" >> "$GITHUB_OUTPUT" + echo "models_for_non_tls=[\"torch/mnist\"]" >> "$GITHUB_OUTPUT" + echo "models_for_no_client_auth=[\"keras/mnist\"]" >> "$GITHUB_OUTPUT" + echo "models_for_memory_logs=[\"torch/mnist\"]" >> "$GITHUB_OUTPUT" + else + echo "models_for_tls=[\"${{env.MODEL_NAME}}\"]" >> "$GITHUB_OUTPUT" + echo "models_for_non_tls=[\"${{env.MODEL_NAME}}\"]" >> "$GITHUB_OUTPUT" + echo "models_for_no_client_auth=[\"${{env.MODEL_NAME}}\"]" >> "$GITHUB_OUTPUT" + echo "models_for_memory_logs=[\"${{env.MODEL_NAME}}\"]" >> "$GITHUB_OUTPUT" + fi + if [ "${{ env.PYTHON_VERSION }}" == "all" ]; then + echo "python_for_tls=[\"3.10\", \"3.11\", \"3.12\"]" >> "$GITHUB_OUTPUT" + echo "python_for_non_tls=[\"3.10\"]" >> "$GITHUB_OUTPUT" + echo "python_for_no_client_auth=[\"3.10\"]" >> "$GITHUB_OUTPUT" + echo "python_for_memory_logs=[\"3.10\"]" >> "$GITHUB_OUTPUT" + else + echo "python_for_tls=[\"${{env.PYTHON_VERSION}}\"]" >> "$GITHUB_OUTPUT" + echo "python_for_non_tls=[\"${{env.PYTHON_VERSION}}\"]" >> "$GITHUB_OUTPUT" + echo "python_for_no_client_auth=[\"${{env.PYTHON_VERSION}}\"]" >> "$GITHUB_OUTPUT" + echo "python_for_memory_logs=[\"${{env.PYTHON_VERSION}}\"]" >> "$GITHUB_OUTPUT" + fi + + test_with_tls: + name: With TLS + needs: input_selection + if: needs.input_selection.outputs.selected_jobs == 'all' || needs.input_selection.outputs.selected_jobs == 'test_with_tls' + runs-on: ubuntu-22.04 + timeout-minutes: 30 + strategy: + matrix: + model_name: ${{ fromJson(needs.input_selection.outputs.selected_models_for_tls) }} + python_version: ${{ fromJson(needs.input_selection.outputs.selected_python_for_tls) }} + fail-fast: false # do not immediately fail if one of the combinations fail + + env: + MODEL_NAME: ${{ matrix.model_name }} + PYTHON_VERSION: ${{ matrix.python_version }} + + steps: + - name: Checkout OpenFL repository + id: checkout_openfl + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} + + - name: Run Task Runner E2E tests with TLS + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/task_runner_tests.py \ + -m task_runner_basic --model_name ${{ env.MODEL_NAME }} \ + --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} + echo "Task runner end to end test run completed" + + - name: Post test run + uses: ./.github/actions/tr_post_test_run + if: ${{ always() }} + with: + test_type: "tr_tls" + + test_with_non_tls: + name: Without TLS + needs: input_selection + if: needs.input_selection.outputs.selected_jobs == 'all' || needs.input_selection.outputs.selected_jobs == 'test_with_non_tls' + runs-on: ubuntu-22.04 + timeout-minutes: 30 + strategy: + matrix: + model_name: ${{ fromJson(needs.input_selection.outputs.selected_models_for_non_tls) }} + python_version: ${{ fromJson(needs.input_selection.outputs.selected_python_for_non_tls) }} + fail-fast: false # do not immediately fail if one of the combinations fail + + env: + MODEL_NAME: ${{ matrix.model_name }} + PYTHON_VERSION: ${{ matrix.python_version }} + + steps: + - name: Checkout OpenFL repository + id: checkout_openfl + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} + + - name: Run Task Runner E2E tests without TLS + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/task_runner_tests.py \ + -m task_runner_basic --model_name ${{ env.MODEL_NAME }} \ + --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} --disable_tls + echo "Task runner end to end test run completed" + + - name: Post test run + uses: ./.github/actions/tr_post_test_run + if: ${{ always() }} + with: + test_type: "tr_non_tls" + + test_with_no_client_auth: + name: Without client auth + needs: input_selection + if: needs.input_selection.outputs.selected_jobs == 'all' || needs.input_selection.outputs.selected_jobs == 'test_with_no_client_auth' + runs-on: ubuntu-22.04 + timeout-minutes: 30 + strategy: + matrix: + model_name: ${{ fromJson(needs.input_selection.outputs.selected_models_for_no_client_auth) }} + python_version: ${{ fromJson(needs.input_selection.outputs.selected_python_for_no_client_auth) }} + fail-fast: false # do not immediately fail if one of the combinations fail + + env: + MODEL_NAME: ${{ matrix.model_name }} + PYTHON_VERSION: ${{ matrix.python_version }} + + steps: + - name: Checkout OpenFL repository + id: checkout_openfl + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} + + - name: Run Task Runner E2E tests without TLS + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/task_runner_tests.py \ + -m task_runner_basic --model_name ${{ env.MODEL_NAME }} \ + --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} --disable_client_auth + echo "Task runner end to end test run completed" + + - name: Post test run + uses: ./.github/actions/tr_post_test_run + if: ${{ always() }} + with: + test_type: 'tr_no_client_auth' + + test_memory_logs: + name: With memory logs + needs: input_selection + if: needs.input_selection.outputs.selected_jobs == 'all' || needs.input_selection.outputs.selected_jobs == 'test_memory_logs' + runs-on: ubuntu-22.04 + timeout-minutes: 30 + strategy: + matrix: + model_name: ${{ fromJson(needs.input_selection.outputs.selected_models_for_memory_logs) }} + python_version: ${{ fromJson(needs.input_selection.outputs.selected_python_for_memory_logs) }} + fail-fast: false # do not immediately fail if one of the combinations fail + + env: + MODEL_NAME: ${{ matrix.model_name }} + PYTHON_VERSION: ${{ matrix.python_version }} + + steps: + - name: Checkout OpenFL repository + id: checkout_openfl + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} + + - name: Run Task Runner E2E tests with TLS and memory logs + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/memory_logs_tests.py \ + -k test_log_memory_usage_basic --model_name ${{ env.MODEL_NAME }} \ + --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} \ + --log_memory_usage + echo "Task runner memory logs test run completed" + + - name: Post test run + uses: ./.github/actions/tr_post_test_run + if: ${{ always() }} + with: + test_type: "tr_tls_memory_logs" + + # to remove later + test_with_tls_restart: + name: With TLS + runs-on: ubuntu-22.04 + timeout-minutes: 30 + strategy: + matrix: + model_name: ["torch/mnist", "keras/mnist"] + python_version: ["3.10", "3.11", "3.12"] + fail-fast: false # do not immediately fail if one of the combinations fail + + env: + MODEL_NAME: ${{ matrix.model_name }} + PYTHON_VERSION: ${{ matrix.python_version }} + + steps: + - name: Checkout OpenFL repository + id: checkout_openfl + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} + + - name: Run Task Runner E2E tests with TLS + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/tr_resiliency_tests.py \ + -m task_runner_basic --model_name ${{ env.MODEL_NAME }} \ + --num_collaborators ${{ env.NUM_COLLABORATORS }} + echo "Task runner end to end test run completed" + + - name: Post test run + uses: ./.github/actions/tr_post_test_run + if: ${{ always() }} + with: + test_type: "tr_tls_with_restart" + + test_with_non_tls_restart: + name: Without TLS + runs-on: ubuntu-22.04 + timeout-minutes: 30 + strategy: + matrix: + model_name: ["torch/mnist", "keras/mnist"] + python_version: ["3.10", "3.11", "3.12"] + fail-fast: false # do not immediately fail if one of the combinations fail + + env: + MODEL_NAME: ${{ matrix.model_name }} + PYTHON_VERSION: ${{ matrix.python_version }} + + steps: + - name: Checkout OpenFL repository + id: checkout_openfl + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} + + - name: Run Task Runner E2E tests without TLS + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/tr_resiliency_tests.py \ + -m task_runner_basic --model_name ${{ env.MODEL_NAME }} \ + --num_collaborators ${{ env.NUM_COLLABORATORS }} + echo "Task runner end to end test run completed" + + - name: Post test run + uses: ./.github/actions/tr_post_test_run + if: ${{ always() }} + with: + test_type: "tr_non_tls_restart" + + test_with_no_client_auth_restart: + name: Without client auth + runs-on: ubuntu-22.04 + timeout-minutes: 30 + strategy: + matrix: + model_name: ["torch/mnist", "keras/mnist"] + python_version: ["3.10", "3.11", "3.12"] + fail-fast: false # do not immediately fail if one of the combinations fail + + env: + MODEL_NAME: ${{ matrix.model_name }} + PYTHON_VERSION: ${{ matrix.python_version }} + + steps: + - name: Checkout OpenFL repository + id: checkout_openfl + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} + + - name: Run Task Runner E2E tests without TLS + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/tr_resiliency_tests.py \ + -m task_runner_basic --model_name ${{ env.MODEL_NAME }} \ + --num_collaborators ${{ env.NUM_COLLABORATORS }} + echo "Task runner end to end test run completed" + + - name: Post test run + uses: ./.github/actions/tr_post_test_run + if: ${{ always() }} + with: + test_type: 'tr_no_client_auth_restart' diff --git a/.github/workflows/test_task_runner_dockerized_ws_e2e.yml b/.github/workflows/test_task_runner_dockerized_ws_e2e.yml new file mode 100644 index 0000000000..8111eb2e88 --- /dev/null +++ b/.github/workflows/test_task_runner_dockerized_ws_e2e.yml @@ -0,0 +1,347 @@ +--- +# Task Runner E2E tests for dockerized approach + +name: Task_Runner_Dockerized_E2E # Please do not modify the name as it is used in the composite action + +on: + workflow_dispatch: + inputs: + num_rounds: + description: "Number of rounds to train" + required: false + default: "5" + type: string + num_collaborators: + description: "Number of collaborators" + required: false + default: "2" + type: string + jobs_to_run: + description: "Jobs to run" + type: choice + default: "all" + options: + - all + - test_with_tls + - test_with_non_tls + - test_with_no_client_auth + - test_memory_logs + required: false + +permissions: + contents: read + +# Environment variables common for all the jobs +env: + NUM_ROUNDS: ${{ inputs.num_rounds || '5' }} + NUM_COLLABORATORS: ${{ inputs.num_collaborators || '2' }} + JOBS_TO_RUN: ${{ inputs.jobs_to_run || 'all' }} + +jobs: + input_selection: + name: Input value selection + runs-on: ubuntu-22.04 + outputs: + selected_jobs: ${{ steps.input_selection.outputs.jobs_to_run }} + steps: + - name: Job to select input values + id: input_selection + run: | + echo "jobs_to_run=${{ env.JOBS_TO_RUN }}" >> "$GITHUB_OUTPUT" + + test_with_tls: + name: With TLS + needs: input_selection + if: needs.input_selection.outputs.selected_jobs == 'all' || needs.input_selection.outputs.selected_jobs == 'test_with_tls' + runs-on: ubuntu-22.04 + timeout-minutes: 15 + strategy: + matrix: + model_name: ["keras/mnist"] + python_version: ["3.10", "3.11", "3.12"] + fail-fast: false # do not immediately fail if one of the combinations fail + + env: + MODEL_NAME: ${{ matrix.model_name }} + PYTHON_VERSION: ${{ matrix.python_version }} + + steps: + - name: Checkout OpenFL repository + id: checkout_openfl + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} + + - name: Run Task Runner E2E tests with TLS + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/task_runner_tests.py \ + -m task_runner_dockerized_ws --model_name ${{ env.MODEL_NAME }} \ + --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} + echo "Task runner end to end test run completed" + + - name: Post test run + uses: ./.github/actions/tr_post_test_run + if: ${{ always() }} + with: + test_type: "tr_tls_dockerized_ws" + + test_with_non_tls_dockerized_ws: + name: Without TLS + needs: input_selection + if: needs.input_selection.outputs.selected_jobs == 'all' || needs.input_selection.outputs.selected_jobs == 'test_with_non_tls' + runs-on: ubuntu-22.04 + timeout-minutes: 15 + strategy: + matrix: + model_name: ["keras/mnist"] + python_version: ["3.10"] + fail-fast: false # do not immediately fail if one of the combinations fail + + env: + MODEL_NAME: ${{ matrix.model_name }} + PYTHON_VERSION: ${{ matrix.python_version }} + + steps: + - name: Checkout OpenFL repository + id: checkout_openfl + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} + + - name: Run Task Runner E2E tests without TLS + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/task_runner_tests.py \ + -m task_runner_dockerized_ws --model_name ${{ env.MODEL_NAME }} \ + --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} --disable_tls + echo "Task runner end to end test run completed" + + - name: Post test run + uses: ./.github/actions/tr_post_test_run + if: ${{ always() }} + with: + test_type: "tr_non_tls_dockerized_ws" + + test_with_no_client_auth_dockerized_ws: + name: Without client auth + needs: input_selection + if: needs.input_selection.outputs.selected_jobs == 'all' || needs.input_selection.outputs.selected_jobs == 'test_with_no_client_auth' + runs-on: ubuntu-22.04 + timeout-minutes: 15 + strategy: + matrix: + model_name: ["keras/mnist"] + python_version: ["3.10"] + fail-fast: false # do not immediately fail if one of the combinations fail + + env: + MODEL_NAME: ${{ matrix.model_name }} + PYTHON_VERSION: ${{ matrix.python_version }} + + steps: + - name: Checkout OpenFL repository + id: checkout_openfl + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} + + - name: Run Task Runner E2E tests without TLS + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/task_runner_tests.py \ + -m task_runner_dockerized_ws --model_name ${{ env.MODEL_NAME }} \ + --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} --disable_client_auth + echo "Task runner end to end test run completed" + + - name: Post test run + uses: ./.github/actions/tr_post_test_run + if: ${{ always() }} + with: + test_type: "tr_no_client_auth_dockerized_ws" + + test_memory_logs_dockerized_ws: + name: With memory logs + needs: input_selection + if: needs.input_selection.outputs.selected_jobs == 'all' || needs.input_selection.outputs.selected_jobs == 'test_memory_logs' + runs-on: ubuntu-22.04 + timeout-minutes: 15 + strategy: + matrix: + model_name: ["keras/mnist"] + python_version: ["3.10"] + fail-fast: false # do not immediately fail if one of the combinations fail + + env: + MODEL_NAME: ${{ matrix.model_name }} + PYTHON_VERSION: ${{ matrix.python_version }} + + steps: + - name: Checkout OpenFL repository + id: checkout_openfl + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} + + - name: Run Task Runner E2E tests with TLS and memory logs + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/memory_logs_tests.py \ + -k test_log_memory_usage_dockerized_ws --model_name ${{ env.MODEL_NAME }} \ + --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} \ + --log_memory_usage + echo "Task runner memory logs test run completed" + + - name: Post test run + uses: ./.github/actions/tr_post_test_run + if: ${{ always() }} + with: + test_type: "tr_tls_memory_logs_dockerized_ws" + + # TODO - remove it before merging. + test_with_tls_restart: + name: With TLS + runs-on: ubuntu-22.04 + timeout-minutes: 30 + strategy: + matrix: + model_name: ["keras/mnist"] + python_version: ["3.10", "3.11", "3.12"] + fail-fast: false # do not immediately fail if one of the combinations fail + + env: + MODEL_NAME: ${{ matrix.model_name }} + PYTHON_VERSION: ${{ matrix.python_version }} + + steps: + - name: Checkout OpenFL repository + id: checkout_openfl + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} + + - name: Run Task Runner E2E tests with TLS + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/tr_resiliency_tests.py \ + -m task_runner_dockerized_ws --model_name ${{ env.MODEL_NAME }} \ + --num_collaborators ${{ env.NUM_COLLABORATORS }} + echo "Task runner end to end test run completed" + + - name: Post test run + uses: ./.github/actions/tr_post_test_run + if: ${{ always() }} + with: + test_type: "tr_tls_with_restart" + + test_with_non_tls_restart: + name: Without TLS + runs-on: ubuntu-22.04 + timeout-minutes: 30 + strategy: + matrix: + model_name: ["keras/mnist"] + python_version: ["3.10", "3.11", "3.12"] + fail-fast: false # do not immediately fail if one of the combinations fail + + env: + MODEL_NAME: ${{ matrix.model_name }} + PYTHON_VERSION: ${{ matrix.python_version }} + + steps: + - name: Checkout OpenFL repository + id: checkout_openfl + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} + + - name: Run Task Runner E2E tests without TLS + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/tr_resiliency_tests.py \ + -m task_runner_dockerized_ws --model_name ${{ env.MODEL_NAME }} \ + --num_collaborators ${{ env.NUM_COLLABORATORS }} + echo "Task runner end to end test run completed" + + - name: Post test run + uses: ./.github/actions/tr_post_test_run + if: ${{ always() }} + with: + test_type: "tr_non_tls_restart" + + test_with_no_client_auth_restart: + name: Without client auth + runs-on: ubuntu-22.04 + timeout-minutes: 30 + strategy: + matrix: + model_name: ["keras/mnist"] + python_version: ["3.10", "3.11", "3.12"] + fail-fast: false # do not immediately fail if one of the combinations fail + + env: + MODEL_NAME: ${{ matrix.model_name }} + PYTHON_VERSION: ${{ matrix.python_version }} + + steps: + - name: Checkout OpenFL repository + id: checkout_openfl + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} + + - name: Run Task Runner E2E tests without TLS + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/tr_resiliency_tests.py \ + -m task_runner_dockerized_ws --model_name ${{ env.MODEL_NAME }} \ + --num_collaborators ${{ env.NUM_COLLABORATORS }} + echo "Task runner end to end test run completed" + + - name: Post test run + uses: ./.github/actions/tr_post_test_run + if: ${{ always() }} + with: + test_type: 'tr_no_client_auth_restart' diff --git a/tests/end_to_end/conftest.py b/tests/end_to_end/conftest.py index 05bdb74d1c..b56afccce1 100644 --- a/tests/end_to_end/conftest.py +++ b/tests/end_to_end/conftest.py @@ -197,21 +197,3 @@ def pytest_sessionfinish(session, exitstatus): dh.cleanup_docker_containers(list_of_containers=["aggregator", "collaborator*"]) # Cleanup docker network created for openfl, if any. dh.remove_docker_network(["openfl"]) - - -def pytest_configure(config): - """ - Configure the pytest plugin. - Args: - config: pytest config object - """ - # Declare some global variables - args = parse_arguments() - # Use the model name from the test case name if not provided as a command line argument - config.model_name = args.model_name - config.num_collaborators = args.num_collaborators - config.num_rounds = args.num_rounds - config.require_client_auth = not args.disable_client_auth - config.use_tls = not args.disable_tls - config.log_memory_usage = args.log_memory_usage - config.results_dir = config.getini("results_dir") diff --git a/tests/end_to_end/models/aggregator.py b/tests/end_to_end/models/aggregator.py index 29c184781c..adf3aa0a3d 100644 --- a/tests/end_to_end/models/aggregator.py +++ b/tests/end_to_end/models/aggregator.py @@ -4,6 +4,7 @@ import logging import os +import tests.end_to_end.utils.constants as constants import tests.end_to_end.utils.exceptions as ex import tests.end_to_end.utils.federation_helper as fh @@ -19,7 +20,7 @@ class Aggregator(): 2. Starting the aggregator """ - def __init__(self, agg_domain_name=None, workspace_path=None, container_id=None, eval_scope=False): + def __init__(self, agg_domain_name, workspace_path, eval_scope=False, container_id=None): """ Initialize the Aggregator class Args: @@ -31,8 +32,10 @@ def __init__(self, agg_domain_name=None, workspace_path=None, container_id=None, self.name = "aggregator" self.agg_domain_name = agg_domain_name self.workspace_path = workspace_path - self.container_id = container_id self.eval_scope = eval_scope + self.container_id = container_id + self.tensor_db_file = os.path.join(self.workspace_path, "local_state", "tensor.db") + self.res_file = None # Result file to track the logs def generate_sign_request(self): """ @@ -52,34 +55,32 @@ def generate_sign_request(self): except Exception as e: raise ex.CSRGenerationException(f"Failed to generate sign request for {self.name}: {e}") - def start(self, res_file, with_docker=False): + def start(self, res_file): """ Start the aggregator Args: res_file (str): Result file to track the logs - with_docker (bool): Flag specific to dockerized workspace scenario. Default is False. Returns: str: Path to the log file """ try: log.info(f"Starting {self.name}") - res_file = res_file if not with_docker else os.path.basename(res_file) error_msg = "Failed to start the aggregator" - command = "fx aggregator start" + command = constants.AGG_START_CMD if self.eval_scope: command = f"{command} --task_group evaluation" fh.run_command( command=command, error_msg=error_msg, container_id=self.container_id, - workspace_path=self.workspace_path if not with_docker else "", + workspace_path=self.workspace_path, run_in_background=True, bg_file=res_file, - with_docker=with_docker ) log.info( f"Started {self.name} and tracking the logs in {res_file}." ) + self.res_file = res_file except Exception as e: log.error(f"{error_msg}: {e}") raise e diff --git a/tests/end_to_end/models/collaborator.py b/tests/end_to_end/models/collaborator.py index dd9389128b..2fe52038e1 100644 --- a/tests/end_to_end/models/collaborator.py +++ b/tests/end_to_end/models/collaborator.py @@ -4,7 +4,7 @@ import os import logging -import tests.end_to_end.utils.docker_helper as dh +import tests.end_to_end.utils.constants as constants import tests.end_to_end.utils.exceptions as ex import tests.end_to_end.utils.federation_helper as fh @@ -36,6 +36,7 @@ def __init__(self, collaborator_name=None, data_directory_path=None, workspace_p self.data_directory_path = data_directory_path self.workspace_path = workspace_path self.container_id = container_id + self.res_file = None # Result file to track the logs def generate_sign_request(self): """ @@ -114,31 +115,29 @@ def import_pki(self, zip_name, with_docker=False): raise e return True - def start(self, res_file, with_docker=False): + def start(self, res_file): """ Start the collaborator Args: res_file (str): Result file to track the logs - with_docker (bool): Flag to run the collaborator inside a docker container Returns: str: Path to the log file """ try: log.info(f"Starting {self.collaborator_name}") - res_file = res_file if not with_docker else os.path.basename(res_file) error_msg = f"Failed to start {self.collaborator_name}" fh.run_command( - f"fx collaborator start -n {self.collaborator_name}", + constants.COL_START_CMD.format(self.collaborator_name), error_msg=error_msg, container_id=self.container_id, - workspace_path=self.workspace_path if not with_docker else "", + workspace_path=self.workspace_path, run_in_background=True, bg_file=res_file, - with_docker=with_docker ) log.info( f"Started {self.name} and tracking the logs in {res_file}." ) + self.res_file = res_file except Exception as e: log.error(f"{error_msg}: {e}") raise e @@ -164,26 +163,6 @@ def install_dependencies(self): raise e return True - def setup_col_docker_env(self, workspace_path, local_bind_path): - """ - Setup the collaborator docker environment - Args: - workspace_path (str): Workspace path - local_bind_path (str): Local bind path - """ - try: - container = dh.start_docker_container( - container_name=self.collaborator_name, - workspace_path=workspace_path, - local_bind_path=local_bind_path, - ) - self.container_id = container.id - - log.info(f"Setup of {self.collaborator_name} docker environment is complete") - except Exception as e: - log.error(f"Failed to setup {self.collaborator_name} docker environment: {e}") - raise e - def import_workspace(self): """ Import the workspace diff --git a/tests/end_to_end/models/model_owner.py b/tests/end_to_end/models/model_owner.py index d03b493cbb..f347bab822 100644 --- a/tests/end_to_end/models/model_owner.py +++ b/tests/end_to_end/models/model_owner.py @@ -64,7 +64,7 @@ def create_workspace(self): output, return_code, error, - error_msg, f"Created the workspace {self.workspace_name} for the {self.model_name} model", + error_msg, f"Created the workspace for {self.model_name}", raise_exception=True ) diff --git a/tests/end_to_end/test_suites/memory_logs_tests.py b/tests/end_to_end/test_suites/memory_logs_tests.py index 09083c9512..7fd5af8a8f 100644 --- a/tests/end_to_end/test_suites/memory_logs_tests.py +++ b/tests/end_to_end/test_suites/memory_logs_tests.py @@ -54,15 +54,15 @@ def _log_memory_usage(request, fed_obj): """ # Start the federation if request.config.test_env == "task_runner_basic": - results = fed_helper.run_federation(fed_obj) + assert fed_helper.run_federation(fed_obj) else: - results = fed_helper.run_federation_for_dws( + assert fed_helper.run_federation_for_dws( fed_obj, use_tls=request.config.use_tls ) # Verify the completion of the federation run assert fed_helper.verify_federation_run_completion( - fed_obj, results, test_env=request.config.test_env, num_rounds=request.config.num_rounds + fed_obj, test_env=request.config.test_env, num_rounds=request.config.num_rounds ), "Federation completion failed" # Verify the aggregator memory logs @@ -87,10 +87,6 @@ def _log_memory_usage(request, fed_obj): collaborator_memory_usage_file = constants.COL_MEM_USAGE_JSON.format( fed_obj.workspace_path, collaborator.name ) - if request.config.test_env == "task_runner_dockerized_ws": - ssh.copy_file_from_docker( - collaborator.name, f"/workspace/logs/{collaborator.name}_memory_usage.json", collaborator_memory_usage_file - ) assert os.path.exists( collaborator_memory_usage_file ), f"Memory usage file for collaborator {collaborator.collaborator_name} is not available" diff --git a/tests/end_to_end/test_suites/task_runner_tests.py b/tests/end_to_end/test_suites/task_runner_tests.py index ff2cbae58a..e75fe16f86 100644 --- a/tests/end_to_end/test_suites/task_runner_tests.py +++ b/tests/end_to_end/test_suites/task_runner_tests.py @@ -3,14 +3,12 @@ import pytest import logging -import os from tests.end_to_end.utils.tr_common_fixtures import ( fx_federation_tr, fx_federation_tr_dws, ) from tests.end_to_end.utils import federation_helper as fed_helper -from tests.end_to_end.utils.summary_helper import get_best_agg_score log = logging.getLogger(__name__) @@ -24,18 +22,16 @@ def test_federation_via_native(request, fx_federation_tr): fx_federation_tr (Fixture): Pytest fixture for native task runner """ # Start the federation - results = fed_helper.run_federation(fx_federation_tr) + assert fed_helper.run_federation(fx_federation_tr) # Verify the completion of the federation run assert fed_helper.verify_federation_run_completion( fx_federation_tr, - results, test_env=request.config.test_env, num_rounds=request.config.num_rounds, ), "Federation completion failed" - tensor_db_file_path = os.path.join(fx_federation_tr.aggregator.workspace_path, "local_state", "tensor.db") - best_agg_score = get_best_agg_score(tensor_db_file_path) + best_agg_score = fed_helper.get_best_agg_score(fx_federation_tr.aggregator.tensor_db_file) log.info(f"Model best aggregated score post {request.config.num_rounds} is {best_agg_score}") @@ -48,18 +44,14 @@ def test_federation_via_dockerized_workspace(request, fx_federation_tr_dws): fx_federation_tr_dws (Fixture): Pytest fixture for dockerized workspace """ # Start the federation - results = fed_helper.run_federation_for_dws( - fx_federation_tr_dws, use_tls=request.config.use_tls - ) + assert fed_helper.run_federation_for_dws(fx_federation_tr_dws, request.config.use_tls) # Verify the completion of the federation run assert fed_helper.verify_federation_run_completion( fx_federation_tr_dws, - results, test_env=request.config.test_env, num_rounds=request.config.num_rounds, ), "Federation completion failed" - tensor_db_file_path = os.path.join(fx_federation_tr_dws.aggregator.workspace_path, "local_state", "tensor.db") - best_agg_score = get_best_agg_score(tensor_db_file_path) + best_agg_score = fed_helper.get_best_agg_score(fx_federation_tr_dws.aggregator.tensor_db_file) log.info(f"Model best aggregated score post {request.config.num_rounds} is {best_agg_score}") diff --git a/tests/end_to_end/test_suites/tr_resiliency_tests.py b/tests/end_to_end/test_suites/tr_resiliency_tests.py new file mode 100644 index 0000000000..f0e9647ec8 --- /dev/null +++ b/tests/end_to_end/test_suites/tr_resiliency_tests.py @@ -0,0 +1,124 @@ +# Copyright 2020-2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import pytest +import logging + +from tests.end_to_end.utils.tr_common_fixtures import ( + fx_federation_tr, + fx_federation_tr_dws, +) +from tests.end_to_end.utils import db_helper as db_helper +from tests.end_to_end.utils import docker_helper as docker_helper +from tests.end_to_end.utils import federation_helper as fed_helper +from tests.end_to_end.utils import interruption_helper as int_helper + +log = logging.getLogger(__name__) + + +# IMPORTANT - Please run the resiliency scenarios with higher no of rounds. + + +@pytest.mark.task_runner_basic +def test_federation_via_native_with_restarts(request, fx_federation_tr): + """ + Test federation with aggregator restart via native task runner. + Args: + request (Fixture): Pytest fixture + fx_federation_tr (Fixture): Pytest fixture for native task runner + """ + # Start the federation + assert fed_helper.run_federation(fx_federation_tr) + + db_file = fx_federation_tr.aggregator.tensor_db_file + _perform_restart_validate_rounds(fed_obj=fx_federation_tr, db_file=db_file, total_rounds=request.config.num_rounds) + + # Verify the completion of the federation run + assert fed_helper.verify_federation_run_completion( + fx_federation_tr, + test_env=request.config.test_env, + num_rounds=request.config.num_rounds, + ) + + best_agg_score = fed_helper.get_best_agg_score(db_file) + log.info( + f"Model best aggregated score post {request.config.num_rounds} is {best_agg_score}" + ) + + log.info( + f"Successfully tested federation experiment with multiple restart scenarios" + ) + + +@pytest.mark.task_runner_dockerized_ws +def test_federation_via_dws_with_restarts(request, fx_federation_tr_dws): + """ + Test federation via dockerized workspace. + Args: + request (Fixture): Pytest fixture + fx_federation_tr_dws (Fixture): Pytest fixture for dockerized workspace + """ + # Start the federation + fed_helper.run_federation_for_dws(fx_federation_tr_dws, request.config.use_tls) + + db_file = fx_federation_tr_dws.aggregator.tensor_db_file + _perform_restart_validate_rounds(fed_obj=fx_federation_tr_dws, db_file=db_file, total_rounds=request.config.num_rounds) + + # Verify the completion of the federation run + assert fed_helper.verify_federation_run_completion( + fx_federation_tr_dws, + test_env=request.config.test_env, + num_rounds=request.config.num_rounds, + ) + + best_agg_score = fed_helper.get_best_agg_score(db_file) + log.info( + f"Model best aggregated score post {request.config.num_rounds} is {best_agg_score}" + ) + + log.info( + f"Successfully tested federation experiment with multiple restart scenarios" + ) + + +def _perform_restart_validate_rounds(fed_obj, db_file, total_rounds): + """ + Internal function to perform restart and validate rounds. + Args: + fed_obj (Fixture): Pytest fixture for federation + db_file (str): Path to the database file + total_rounds (int): Total number of rounds + """ + + init_round = fed_helper.get_current_round(db_file) + + # Restart aggregator + assert int_helper.restart_participants([fed_obj.aggregator]) + log.info("Aggregator restarted successfully") + + assert ( + round_post_agg_restart := fed_helper.validate_round_increment( + init_round, db_file + ) + ), f"Expected current round to be ahead of {init_round} after aggregator restart" + + # Restart collaborators + assert int_helper.restart_participants(fed_obj.collaborators) + log.info("Collaborators restarted successfully") + + assert ( + round_post_collab_restart := fed_helper.validate_round_increment( + round_post_agg_restart, db_file + ) + ), f"Expected current round to be ahead of {round_post_agg_restart} after collaborators restart" + + # Restart all participants + assert int_helper.restart_participants(fed_obj.collaborators + [fed_obj.aggregator]) + log.info("All participants restarted successfully") + + assert fed_helper.validate_round_increment( + round_post_collab_restart, db_file, + total_rounds, + ), f"Expected current round to be ahead of {round_post_collab_restart} after all participants restart" + + log.info("Current round number is increasing after every restart as expected.") diff --git a/tests/end_to_end/test_suites/tr_with_fedeval_tests.py b/tests/end_to_end/test_suites/tr_with_fedeval_tests.py index dacbb9f3a5..ec3735acb2 100644 --- a/tests/end_to_end/test_suites/tr_with_fedeval_tests.py +++ b/tests/end_to_end/test_suites/tr_with_fedeval_tests.py @@ -11,7 +11,6 @@ ) from tests.end_to_end.utils import federation_helper as fed_helper from tests.end_to_end.utils.tr_workspace import create_tr_workspace, create_tr_dws_workspace -from tests.end_to_end.utils.summary_helper import get_best_agg_score log = logging.getLogger(__name__) @@ -26,35 +25,32 @@ def test_eval_federation_via_native(request, fx_federation_tr): fx_federation_tr (Fixture): Pytest fixture for native task runner """ # Start the federation - results = fed_helper.run_federation(fx_federation_tr) + assert fed_helper.run_federation(fx_federation_tr) # Verify the completion of the federation run assert fed_helper.verify_federation_run_completion( fx_federation_tr, - results, test_env=request.config.test_env, num_rounds=request.config.num_rounds, ), "Federation completion failed" # Set the best model path in request. It is used during plan initialization for evaluation step request.config.best_model_path = os.path.join(fx_federation_tr.aggregator.workspace_path, "save", "best.pbuf") - tensor_db_file_path = os.path.join(fx_federation_tr.aggregator.workspace_path, "local_state", "tensor.db") - best_model_score = get_best_agg_score(tensor_db_file_path) + best_model_score = fed_helper.get_best_agg_score(fx_federation_tr.aggregator.tensor_db_file) log.info(f"Model score post {request.config.num_rounds} rounds: {best_model_score}") # Create new workspace with evaluation scope new_fed_obj = create_tr_workspace(request, eval_scope=True) - results_new = fed_helper.run_federation(new_fed_obj) + assert fed_helper.run_federation(new_fed_obj) + # Verify the completion of the federation run assert fed_helper.verify_federation_run_completion( new_fed_obj, - results_new, test_env=request.config.test_env, num_rounds=1, ), "Federation completion failed" - tensor_db_file_path = os.path.join(new_fed_obj.aggregator.workspace_path, "local_state", "tensor.db") - best_model_score_eval = get_best_agg_score(tensor_db_file_path) + best_model_score_eval = fed_helper.get_best_agg_score(new_fed_obj.aggregator.tensor_db_file) log.info(f"Model score post {request.config.num_rounds} rounds: {best_model_score}") # verify that the model score is similar to the previous model score max of 0.001% difference @@ -70,14 +66,13 @@ def test_eval_federation_via_dockerized_workspace(request, fx_federation_tr_dws) fx_federation_tr_dws (Fixture): Pytest fixture for dockerized workspace """ # Start the federation - results = fed_helper.run_federation_for_dws( + assert fed_helper.run_federation_for_dws( fx_federation_tr_dws, use_tls=request.config.use_tls ) # Verify the completion of the federation run assert fed_helper.verify_federation_run_completion( fx_federation_tr_dws, - results, test_env=request.config.test_env, num_rounds=request.config.num_rounds, ), "Federation completion failed" @@ -85,24 +80,21 @@ def test_eval_federation_via_dockerized_workspace(request, fx_federation_tr_dws) # Set the best model path in request. It is used during plan initialization for evaluation step request.config.best_model_path = os.path.join(fx_federation_tr_dws.aggregator.workspace_path, "save", "best.pbuf") - tensor_db_file_path = os.path.join(fx_federation_tr_dws.aggregator.workspace_path, "local_state", "tensor.db") - best_model_score = get_best_agg_score(tensor_db_file_path) + best_model_score = fed_helper.get_best_agg_score(fx_federation_tr_dws.aggregator.tensor_db_file) log.info(f"Model score post {request.config.num_rounds} rounds: {best_model_score}") # Create new workspace with evaluation scope new_fed_obj = create_tr_dws_workspace(request, eval_scope=True) - results_new = fed_helper.run_federation_for_dws(new_fed_obj, use_tls=request.config.use_tls) + assert fed_helper.run_federation_for_dws(new_fed_obj, use_tls=request.config.use_tls) # Verify the completion of the federation run assert fed_helper.verify_federation_run_completion( new_fed_obj, - results_new, test_env=request.config.test_env, num_rounds=1, ), "Federation completion failed" - tensor_db_file_path = os.path.join(new_fed_obj.aggregator.workspace_path, "local_state", "tensor.db") - best_model_score_eval = get_best_agg_score(tensor_db_file_path) + best_model_score_eval = fed_helper.get_best_agg_score(new_fed_obj.aggregator.tensor_db_file) log.info(f"Model score post {request.config.num_rounds} rounds: {best_model_score}") # verify that the model score is similar to the previous model score max of 0.001% difference diff --git a/tests/end_to_end/utils/conftest_helper.py b/tests/end_to_end/utils/conftest_helper.py index af912dd2c6..b05cfb45b2 100644 --- a/tests/end_to_end/utils/conftest_helper.py +++ b/tests/end_to_end/utils/conftest_helper.py @@ -27,12 +27,12 @@ def parse_arguments(): """ try: parser = argparse.ArgumentParser(description="Provide the required arguments to run the tests") - parser.add_argument("--num_collaborators", type=int, default=2, help="Number of collaborators") - parser.add_argument("--num_rounds", type=int, default=5, help="Number of rounds to train") - parser.add_argument("--model_name", type=str, help="Model name") - parser.add_argument("--disable_client_auth", action="store_true", help="Disable client authentication") - parser.add_argument("--disable_tls", action="store_true", help="Disable TLS for communication") - parser.add_argument("--log_memory_usage", action="store_true", help="Enable Memory leak logs") + parser.add_argument("--num_collaborators", type=int, default=2, help="Number of collaborators. Default is 2") + parser.add_argument("--num_rounds", type=int, default=5, help="Number of rounds to train. Default is 5") + parser.add_argument("--model_name", type=str, help="Model name. Not required for Workflow APIs") + parser.add_argument("--disable_client_auth", action="store_true", help="Disable client authentication. Default is False") + parser.add_argument("--disable_tls", action="store_true", help="Disable TLS for communication. Default is False") + parser.add_argument("--log_memory_usage", action="store_true", help="Enable Memory leak logs. Default is False") args = parser.parse_known_args()[0] return args diff --git a/tests/end_to_end/utils/constants.py b/tests/end_to_end/utils/constants.py index 94cf4d1999..4bb28d054f 100644 --- a/tests/end_to_end/utils/constants.py +++ b/tests/end_to_end/utils/constants.py @@ -43,3 +43,9 @@ class ModelName(Enum): # Memory logs related AGG_MEM_USAGE_JSON = "{}/aggregator/workspace/logs/aggregator_memory_usage.json" # example - /tmp/my_federation/aggregator/workspace/logs/aggregator_memory_usage.json COL_MEM_USAGE_JSON = "{0}/{1}/workspace/logs/{1}_memory_usage.json" # example - /tmp/my_federation/collaborator1/workspace/logs/collaborator1_memory_usage.json + +AGG_START_CMD = "fx aggregator start" +COL_START_CMD = "fx collaborator start -n {}" + +COL_CERTIFY_CMD = "fx collaborator certify --import 'agg_to_col_{}_signed_cert.zip'" +DFLT_DOCKERIZE_IMAGE_NAME = "workspace" diff --git a/tests/end_to_end/utils/db_helper.py b/tests/end_to_end/utils/db_helper.py index fe185a7452..772b1b39e0 100644 --- a/tests/end_to_end/utils/db_helper.py +++ b/tests/end_to_end/utils/db_helper.py @@ -1,7 +1,11 @@ # Copyright 2020-2025 Intel Corporation # SPDX-License-Identifier: Apache-2.0 +import os import sqlite3 +import time + +import tests.end_to_end.utils.exceptions as ex # Database schema: # Table: key_value_store @@ -28,16 +32,14 @@ def close(self): def read_key_value_store(self): """ - key_value_store - This table holds key-value pairs. It only holds best_score and round_number. - This table holds which round had best score and what is best score until now in experiment. - Reads key-value pairs from the key_value_store table in the database. This method connects to the database, executes a query to fetch all key-value pairs - from the key_value_store table, and then closes the connection. It returns the values - associated with the 'round_number' and 'best_score' keys. + from the key_value_store table, and then closes the connection. + Currently, it only holds best_score and round_number. + Raises: ValueError: If either 'round_number' or 'best_score' keys are not found in the key_value_store. Returns: - tuple: A tuple containing the values of 'round_number' and 'best_score'. + dict: A dictionary containing all key-value pairs from the key_value_store table. """ self.connect() self.cursor.execute("SELECT key, value FROM key_value_store") @@ -46,7 +48,35 @@ def read_key_value_store(self): key_value_dict = {row[0]: row[1] for row in rows} - if 'round_number' not in key_value_dict or 'best_score' not in key_value_dict: - raise ValueError("Required keys 'round_number' and 'best_score' not found in key_value_store") + # DO NOT add any exception here as the calling functions have retries and will handle the exception. + + return key_value_dict + + +def get_key_value_from_db(key, database_file, max_retries=10, sleep_interval=5): + """ + Get value by key from the database file + Args: + key (str): Key to search. For example - round_number, best_score. + database_file (str): Database file + max_retries (int): Maximum number of retries if the file does not exist + sleep_interval (int): Time to wait between retries in seconds + Returns: + str: Value for the key + """ + retries = 0 + # Observation - it always takes a few attempts in the beginning to get the values from the database. + while retries < max_retries: + if os.path.exists(database_file): + db_obj = DBHelper(database_file) + val = db_obj.read_key_value_store().get(key) + if val: + return val + print("Value not found in the database. Retrying in 5 seconds...") + else: + print("Database file not found. Retrying in 5 seconds...") + + time.sleep(sleep_interval) + retries += 1 - return key_value_dict['round_number'], key_value_dict['best_score'] + raise ex.TensorDBException(f"Failed to get value for key {key} from the database after {max_retries} retries.") diff --git a/tests/end_to_end/utils/docker_helper.py b/tests/end_to_end/utils/docker_helper.py index f69e494cbb..45c7e72e36 100644 --- a/tests/end_to_end/utils/docker_helper.py +++ b/tests/end_to_end/utils/docker_helper.py @@ -3,7 +3,6 @@ import logging import docker -import os from functools import lru_cache import tests.end_to_end.utils.constants as constants @@ -60,10 +59,9 @@ def check_docker_image(): log.debug(f"Image {constants.DEFAULT_OPENFL_IMAGE} exists") -def start_docker_container( - container_name, - workspace_path, - local_bind_path, +def start_docker_container_with_federation_run( + participant, + use_tls=True, image=constants.DEFAULT_OPENFL_IMAGE, network=constants.DOCKER_NETWORK_NAME, env_keyval_list=None, @@ -71,11 +69,11 @@ def start_docker_container( mount_mapping=None, ): """ - Start the docker container with provided name. + Start the docker container for given participant and sets its container ID. + IMPORTANT: Internally runs the command to start the federation run. Args: - container_name: Name of the container - workspace_path: Workspace path - local_bind_path: Local bind path + participant: Participant object (aggregator/collaborator) + use_tls: Flag to indicate if TLS is enabled. Default is True. image: Docker image to use network: Docker network to use (default is openfl) env_keyval_list: List of environment variables to set. @@ -93,13 +91,14 @@ def start_docker_container( local_participant_path = mount_mapping[0].split(":")[0] docker_participant_path = mount_mapping[0].split(":")[1] else: - local_participant_path = os.path.join(local_bind_path, container_name, "workspace") - docker_participant_path = "/workspace" + local_participant_path = participant.workspace_path + + docker_participant_path = f"/{constants.DFLT_DOCKERIZE_IMAGE_NAME}" volumes = { local_participant_path: {"bind": docker_participant_path, "mode": "rw"}, } - log.debug(f"Volumes for {container_name}: {volumes}") + log.debug(f"Volumes for {participant.name}: {volumes}") environment = { "WORKSPACE_PATH": docker_participant_path, @@ -111,7 +110,26 @@ def start_docker_container( key, val = keyval.split("=") environment[key] = val - log.debug(f"Environment variables for {container_name}: {environment}") + log.debug(f"Environment variables for {participant.name}: {environment}") + + # Prepare the commands to run based on the participant + log_file = f"{docker_participant_path}/{participant.name}.log" + + if participant.name == "aggregator": + start_agg = constants.AGG_START_CMD + # Handle Fed Eval case + if participant.eval_scope: + start_agg += " --task_group evaluation" + command = ["bash", "-c", f"touch {log_file} && {start_agg} > {log_file} 2>&1"] + else: + start_collaborator = f"touch {log_file} && {constants.COL_START_CMD.format(participant.name)} > {log_file} 2>&1" + if use_tls: + command = ["bash", "-c", f"{constants.COL_CERTIFY_CMD.format(participant.name)} && {start_collaborator}"] + else: + command = ["bash", "-c", start_collaborator] + + log.info(f"Command for {participant.name}: {command}") + # Start a container from the image container = client.containers.run( image, @@ -119,14 +137,15 @@ def start_docker_container( user="root", auto_remove=False, tty=True, - name=container_name, + name=participant.name, network=network, security_opt=security_opt, volumes=volumes, environment=environment, use_config_proxy=False, # Do not use proxy for docker container + command=command ) - log.info(f"Container for {container_name} started with ID: {container.id}") + log.info(f"Container for {participant.name} started with ID: {container.id}") except Exception as e: raise ex.DockerException(f"Error starting docker container: {e}") @@ -169,3 +188,28 @@ def cleanup_docker_containers(list_of_containers=["aggregator", "collaborator*"] if containers: log.info(f"Docker containers {container_names} cleaned up successfully") + + +def stop_start_docker_participant(participant, action): + """ + Stop or start the docker participant. + Args: + participant: Participant object + action (str): Action to perform (stop/start) + """ + if action not in ["stop", "start"]: + raise ex.DockerException(f"Invalid action {action}") + + client = get_docker_client() + + # List containers with the participant name + containers = client.containers.list(all=True, filters={"name": participant.name}) + container_names = [] + + for container in containers: + # Restart the participant + container.stop() if action == "stop" else container.start() + log.debug(f"Docker {action} successful for {container.name}") + container_names.append(container.name) + + return True diff --git a/tests/end_to_end/utils/exceptions.py b/tests/end_to_end/utils/exceptions.py index 2a12842080..ed81b304bf 100644 --- a/tests/end_to_end/utils/exceptions.py +++ b/tests/end_to_end/utils/exceptions.py @@ -91,3 +91,23 @@ class DirectorStartException(Exception): class DataSetupException(Exception): """Exception for data setup for given model""" pass + + +class ParticipantStartException(Exception): + """Exception for participant start""" + pass + + +class ParticipantStopException(Exception): + """Exception for participant stop""" + pass + + +class ParticipantRestartException(Exception): + """Exception for participant restart""" + pass + + +class TensorDBException(Exception): + """Exception for tensor database""" + pass diff --git a/tests/end_to_end/utils/federation_helper.py b/tests/end_to_end/utils/federation_helper.py index ff37f2430c..24070b439a 100644 --- a/tests/end_to_end/utils/federation_helper.py +++ b/tests/end_to_end/utils/federation_helper.py @@ -13,6 +13,7 @@ import shutil import tests.end_to_end.utils.constants as constants +import tests.end_to_end.utils.db_helper as db_helper import tests.end_to_end.utils.docker_helper as dh import tests.end_to_end.utils.exceptions as ex import tests.end_to_end.utils.ssh_helper as ssh @@ -239,15 +240,14 @@ def copy_file_between_participants( return True -def run_federation(fed_obj, install_dependencies=True, with_docker=False): +def run_federation(fed_obj, install_dependencies=True): """ Start the federation Args: fed_obj (object): Federation fixture object install_dependencies (bool): Install dependencies on collaborators (default is True) - with_docker (bool): Flag specific to dockerized workspace scenario. Default is False. Returns: - list: List of response files for all the participants + bool: True if successful, else False """ executor = concurrent.futures.ThreadPoolExecutor() if install_dependencies: @@ -260,7 +260,6 @@ def run_federation(fed_obj, install_dependencies=True, with_docker=False): constants.AGG_COL_RESULT_FILE.format( fed_obj.workspace_path, participant.name ), - with_docker=with_docker, ) for participant in [fed_obj.aggregator] + fed_obj.collaborators ] @@ -268,8 +267,8 @@ def run_federation(fed_obj, install_dependencies=True, with_docker=False): # Result will contain response files for all the participants. results = [f.result() for f in futures] if not all(results): - raise Exception("Failed to start one or more participants") - return results + raise ex.ParticipantStartException("Failed to start one or more participants") + return True def run_federation_for_dws(fed_obj, use_tls): @@ -279,48 +278,23 @@ def run_federation_for_dws(fed_obj, use_tls): fed_obj (object): Federation fixture object use_tls (bool): Use TLS or not (default is True) Returns: - list: List of response files for all the participants + bool: True if successful, else False """ - executor = concurrent.futures.ThreadPoolExecutor() - - try: - results = [ - executor.submit( - run_command, - command=f"tar -xf /workspace/cert_{participant.name}.tar", - workspace_path="", - error_msg=f"Failed to extract certificates for {participant.name}", - container_id=participant.container_id, - with_docker=True, - ) - for participant in [fed_obj.aggregator] + fed_obj.collaborators - ] - if not all([f.result() for f in results]): - raise Exception( - "Failed to extract certificates for one or more participants" - ) - except Exception as e: - raise e - - if use_tls: + for participant in [fed_obj.aggregator] + fed_obj.collaborators: try: - results = [ - executor.submit( - collaborator.import_pki, - zip_name=f"agg_to_col_{collaborator.name}_signed_cert.zip", - with_docker=True, - ) - for collaborator in fed_obj.collaborators - ] - if not all([f.result() for f in results]): - raise Exception( - "Failed to import and certify the CSR for one or more collaborators" - ) + container = dh.start_docker_container_with_federation_run( + participant=participant, + image=constants.DFLT_DOCKERIZE_IMAGE_NAME, + use_tls=use_tls, + ) except Exception as e: + log.error(f"Failed to start docker container for {participant.name}: {e}") raise e - # Start federation run for all the participants - return run_federation(fed_obj, with_docker=True) + participant.container_id = container.id + participant.res_file = os.path.join(participant.workspace_path, f"{participant.name}.log") + + return True def install_dependencies_on_collaborators(fed_obj): @@ -344,12 +318,11 @@ def install_dependencies_on_collaborators(fed_obj): raise Exception("Failed to install dependencies on one or more collaborators") -def verify_federation_run_completion(fed_obj, results, test_env, num_rounds): +def verify_federation_run_completion(fed_obj, test_env, num_rounds): """ Verify the completion of the process for all the participants Args: fed_obj (object): Federation fixture object - results (list): List of results test_env (str): Test environment num_rounds (int): Number of rounds Returns: @@ -364,33 +337,28 @@ def verify_federation_run_completion(fed_obj, results, test_env, num_rounds): _verify_completion_for_participant, participant, num_rounds, - results[i], - test_env, - local_bind_path=fed_obj.local_bind_path, ) - for i, participant in enumerate(fed_obj.collaborators + [fed_obj.aggregator]) + for participant in fed_obj.collaborators + [fed_obj.aggregator] ] # Result will contain a list of boolean values for all the participants. # True - successful completion, False - failed/incomplete results = [f.result() for f in futures] - log.info(f"Results from all the participants: {results}") + log.debug(f"Results from all the participants: {results}") # If any of the participant failed, return False, else return True return all(results) def _verify_completion_for_participant( - participant, num_rounds, result_file, test_env, time_for_each_round=100, local_bind_path=None + participant, num_rounds, time_for_each_round=100 ): """ Verify the completion of the process for the participant Args: participant (object): Participant object num_rounds (int): Number of rounds - result_file (str): Result file time_for_each_round (int): Time for each round - local_bind_path (str, Optional): Local bind path. Applicable in case of docker environment Returns: bool: True if successful, else False """ @@ -398,17 +366,6 @@ def _verify_completion_for_participant( # Set timeout based on the number of rounds and time for each round timeout = 600 + (time_for_each_round * num_rounds) # in seconds - # In case of docker environment, get the logs from local path which is mounted to the container - if test_env == "task_runner_dockerized_ws": - result_file = constants.AGG_COL_RESULT_FILE.format( - local_bind_path, participant.name - ) - ssh.copy_file_from_docker( - participant.name, f"/workspace/{participant.name}.log", result_file - ) - - log.info(f"Result file is: {result_file}") - # Do not open file here as it will be opened in the loop below # Also it takes time for the federation run to start and write the logs content = [""] @@ -417,7 +374,7 @@ def _verify_completion_for_participant( while ( constants.SUCCESS_MARKER not in content and time.time() - start_time < timeout ): - with open(result_file, "r") as file: + with open(participant.res_file, "r") as file: lines = [line.strip() for line in file.readlines()] content = list(filter(str.rstrip, lines))[-1:] @@ -428,14 +385,6 @@ def _verify_completion_for_participant( log.info(f"Process is yet to complete for {participant.name}") time.sleep(45) - # Copy the log file from docker container to local machine everytime to get the latest logs - if test_env == "task_runner_dockerized_ws": - ssh.copy_file_from_docker( - participant.name, - f"/workspace/{participant.name}.log", - constants.AGG_COL_RESULT_FILE.format(local_bind_path, participant.name), - ) - if constants.SUCCESS_MARKER not in content: log.error( f"Process failed/is incomplete for {participant.name} after timeout of {timeout} seconds" @@ -453,6 +402,7 @@ def federation_env_setup_and_validate(request, eval_scope=False): Setup the federation environment and validate the configurations Args: request (object): Request object + eval_scope (bool): If True, sets up the evaluation scope for a single round Returns: tuple: Model name, workspace path, local bind path, aggregator domain name """ @@ -465,11 +415,12 @@ def federation_env_setup_and_validate(request, eval_scope=False): if not request.config.model_name.replace("/", "_").upper() in constants.ModelName._member_names_: raise ValueError(f"Invalid model name: {request.config.model_name}") - # Set the workspace path + # Set the workspace path specific to the model and the test case home_dir = Path().home() local_bind_path = os.path.join( - home_dir, request.config.results_dir, request.config.model_name.replace("/", "_") + home_dir, request.config.results_dir, request.node.name, request.config.model_name.replace("/", "_") ) + num_rounds = request.config.num_rounds if eval_scope: @@ -478,6 +429,7 @@ def federation_env_setup_and_validate(request, eval_scope=False): log.info(f"Running evaluation for the model: {request.config.model_name}") workspace_path = local_bind_path + # if path exists delete it if os.path.exists(workspace_path): shutil.rmtree(workspace_path) @@ -607,7 +559,7 @@ def run_command( log.info(f"Running command: {command}") if run_in_background and not with_docker: - bg_file = open(bg_file, "w", buffering=1) + bg_file = open(bg_file, "a", buffering=1) # open file in append mode, so that restarting scenarios can be handled ssh.run_command_background( command, work_dir=workspace_path, @@ -820,33 +772,6 @@ def write_memory_usage_to_file(memory_usage_dict, output_file): raise e -def start_docker_containers_for_dws( - participants, workspace_path, local_bind_path, image_name -): - """ - Start docker containers for the participants - Args: - participants (list): List of participant objects (collaborators and aggregator) - workspace_path (str): Workspace path - local_bind_path (str): Local bind path - image_name (str): Docker image name - """ - for participant in participants: - try: - # In case of dockerized workspace, the workspace gets created inside folder with image name - container = dh.start_docker_container( - container_name=participant.name, - workspace_path=workspace_path, - local_bind_path=local_bind_path, - image=image_name, - ) - participant.container_id = container.id - except Exception as e: - raise ex.DockerException( - f"Failed to start {participant.name} docker environment: {e}" - ) - - def start_director(workspace_path, dir_res_file): """ Start the director. @@ -1027,3 +952,54 @@ def verify_federated_runtime_experiment_completion(participant_res_files): log.error(f"Process failed for {name}") return False return True + + +def get_current_round(database_file: str) -> int: + """ + Get the current round number from the database file + Args: + database_file (str): Database file + Returns: + int: Current round number + """ + return int(db_helper.get_key_value_from_db("round_number", database_file)) + + +def get_best_agg_score(database_file: str) -> float: + """ + Get the best aggregated score from the database file + Args: + database_file (str): Database file + Returns: + float: Best aggregated score + """ + return db_helper.get_key_value_from_db("best_score", database_file) + + +def validate_round_increment(inp_round, database_file, total_rounds, timeout=300, sleep_interval=5): + """ + Validate if the round number has increased from inp_round by fetching the value via get_key_value_from_db + and retrying with some wait time for input timeout. + Args: + inp_round (int): The initial round number to compare against. + database_file (str): The path to the database file. + total_rounds (int): The total number of rounds expected. + timeout (int): The maximum time to wait in seconds. + Default is 300 seconds as some of the models take more time to complete the round. + sleep_interval (int): The wait time between retries in seconds. Default is 5 seconds. + Returns: + round number(int) if current round number has increased, else False. + """ + if inp_round == total_rounds: + log.info("Federation is already at the last round.") + return inp_round + + start_time = time.time() + while time.time() - start_time < timeout: + current_round = get_current_round(database_file) + + if current_round > inp_round: + return current_round + log.info(f"Round number has not increased. Retrying in {sleep_interval} seconds...") + time.sleep(sleep_interval) + return False diff --git a/tests/end_to_end/utils/interruption_helper.py b/tests/end_to_end/utils/interruption_helper.py new file mode 100644 index 0000000000..96f203b82c --- /dev/null +++ b/tests/end_to_end/utils/interruption_helper.py @@ -0,0 +1,102 @@ +# Copyright 2020-2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import logging +import concurrent.futures +import time +import os +import subprocess # nosec B404 + +import tests.end_to_end.utils.constants as constants +import tests.end_to_end.utils.docker_helper as docker_helper +import tests.end_to_end.utils.exceptions as ex + +log = logging.getLogger(__name__) + + +def restart_participants(participants, action="restart") -> bool: + """ + Restart the participant (collaborator or aggregator) in the federation. + Args: + participants: List of participant objects + action: Action to perform (stop/start/restart). Default is restart. + """ + if action not in ["stop", "start", "restart"]: + raise ex.ParticipantRestartException(f"Invalid action {action}") + + executor = concurrent.futures.ThreadPoolExecutor() + + # ASSUMPTION - if container ID is present, it's docker environment else native + + if action in ["restart", "stop"]: + # Stop the participants in parallel + results = [ + executor.submit( + stop_start_native_participant if participant.container_id is None else docker_helper.stop_start_docker_participant, + participant, + action="stop" + ) + for participant in participants + ] + if not all([f.result() for f in results]): + raise ex.ParticipantStopException( + "Failed to stop one or more participants" + ) + + if action == "restart": + # Wait for 10 seconds + time.sleep(10) + log.info("Waited for 10 seconds") + + if action in ["restart", "start"]: + # Start the participants in parallel + results = [ + executor.submit( + stop_start_native_participant if participant.container_id is None else docker_helper.stop_start_docker_participant, + participant, + action="start" + ) + for participant in participants + ] + if not all([f.result() for f in results]): + raise ex.ParticipantStartException( + "Failed to start one or more participants" + ) + return True + + +def stop_start_native_participant(participant, action): + """ + Function to stop/start given participant. + Args: + participant (object): Participant object + action: Action to perform (stop/start) + """ + if action not in ["stop", "start"]: + raise ex.ParticipantStopException(f"Invalid action {action}") + + if action == "stop": + cmd_for_process_kill = constants.AGG_START_CMD if participant.name == "aggregator" else constants.COL_START_CMD.format(participant.name) + pids = [] + # Find the process ID + for line in os.popen(f"ps ax | grep '{cmd_for_process_kill}' | grep -v grep"): + fields = line.split() + pids.append(fields[0]) + + if not pids: + raise RuntimeError(f"No processes found for command '{cmd_for_process_kill}'") + + # Kill all processes using sudo + for pid in pids: + try: + subprocess.run(['sudo', 'kill', '-9', pid], check=True) + except subprocess.CalledProcessError as e: + raise RuntimeError(f"Failed to kill process '{pid}': {e}") + + else: + try: + participant.start(res_file=participant.res_file) + except Exception as e: + raise ex.ParticipantStartException(f"Error starting participant: {e}") + + return True diff --git a/tests/end_to_end/utils/summary_helper.py b/tests/end_to_end/utils/summary_helper.py index 486a3a6416..7e74a32ee9 100644 --- a/tests/end_to_end/utils/summary_helper.py +++ b/tests/end_to_end/utils/summary_helper.py @@ -9,7 +9,7 @@ from pathlib import Path import tests.end_to_end.utils.constants as constants -from tests.end_to_end.utils.db_helper import DBHelper +from tests.end_to_end.utils import federation_helper as fed_helper result_path = os.path.join(Path().home(), "results") @@ -32,25 +32,6 @@ def initialize_xml_parser(): return testsuites -def get_best_agg_score(database_file): - """ - Get the best_score from the database - Args: - database_file: the database file - Returns: - best_agg_score: the best score - """ - best_agg_score = "Not Found" - if not os.path.exists(database_file): - print(f"Database file {database_file} not found. Cannot get best aggregated score") - return best_agg_score - - db_obj = DBHelper(database_file) - round_number, best_agg_score = db_obj.read_key_value_store() - print(f"Best aggregated score: {best_agg_score} is in round_number {round_number} ") - return best_agg_score - - def get_test_status(result): """ Get the test status/verdict @@ -143,16 +124,31 @@ def print_task_runner_score(): ) return + # List all directories inside result_path + directories = [d for d in os.listdir(result_path) if os.path.isdir(os.path.join(result_path, d))] + + # Find the directory that starts with 'test_' + test_specific_result_path = None + for directory in directories: + if directory.startswith('test_'): + test_specific_result_path = os.path.join(result_path, directory) + break + + if not test_specific_result_path: + print("No directory starting with 'test_' found in the result path.") + return + # Assumption - result directory is present in the home directory tensor_db_file = os.path.join( - result_path, + test_specific_result_path, model_name, "aggregator", "workspace", "local_state", "tensor.db", ) - best_score = get_best_agg_score(tensor_db_file) + # If the federation run fails in between, tensor.db file won't be present + best_score = fed_helper.get_best_agg_score(tensor_db_file) if os.path.exists(tensor_db_file) else "Not Found" # Write the results to GitHub step summary file # This file is created at runtime by the GitHub action, thus we cannot verify its existence beforehand @@ -255,6 +251,7 @@ def fetch_args(): # Fetch input arguments args = fetch_args() func_name = args.func_name + if func_name in ["print_task_runner_score", "print_local_runtime_score"]: print_task_runner_score() elif func_name == "print_federated_runtime_score": diff --git a/tests/end_to_end/utils/tr_workspace.py b/tests/end_to_end/utils/tr_workspace.py index c9c4400c0e..019bc7c952 100644 --- a/tests/end_to_end/utils/tr_workspace.py +++ b/tests/end_to_end/utils/tr_workspace.py @@ -55,7 +55,7 @@ def create_tr_workspace(request, eval_scope=False): # Modify the plan plan_path = constants.AGG_PLAN_PATH.format(local_bind_path) - param_config=request.config + param_config = request.config initial_model_path = None if eval_scope: @@ -81,8 +81,8 @@ def create_tr_workspace(request, eval_scope=False): aggregator = agg_model.Aggregator( agg_domain_name=agg_domain_name, workspace_path=agg_workspace_path, - container_id=model_owner.container_id, # None in case of non-docker environment - eval_scope=eval_scope + eval_scope=eval_scope, + container_id=model_owner.container_id, # None in case of native environment ) # Generate the sign request and certify the aggregator in case of TLS @@ -173,7 +173,7 @@ def create_tr_dws_workspace(request, eval_scope=False): # Command 'fx workspace dockerize --save ..' will use the workspace name for image name # which is 'workspace' in this case. model_owner.dockerize_workspace() - image_name = "workspace" + image_name = constants.DFLT_DOCKERIZE_IMAGE_NAME # Certify the workspace in case of TLS # Register the collaborators in case of non-TLS @@ -188,8 +188,8 @@ def create_tr_dws_workspace(request, eval_scope=False): aggregator = agg_model.Aggregator( agg_domain_name=agg_domain_name, workspace_path=agg_workspace_path, - container_id=model_owner.container_id, # None in case of non-docker environment - eval_scope=eval_scope + eval_scope=eval_scope, + container_id=model_owner.container_id, # None in case of native environment ) futures = [ @@ -234,13 +234,6 @@ def create_tr_dws_workspace(request, eval_scope=False): # to the other machine(s) so that docker load can load the image. model_owner.load_workspace(workspace_tar_name=f"{image_name}.tar") - fh.start_docker_containers_for_dws( - participants=[aggregator] + collaborators, - workspace_path=workspace_path, - local_bind_path=local_bind_path, - image_name=image_name, - ) - # Return the federation fixture return federation_details( model_owner=model_owner,