Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(agents-api): make parallelism = task_max_parallelism the default for MapReduce #1158

Merged
merged 6 commits into from
Feb 14, 2025

Conversation

Ahmad-mtos
Copy link
Contributor

@Ahmad-mtos Ahmad-mtos commented Feb 14, 2025

PR Type

Enhancement, Bug fix, Tests


Description

  • Set parallelism = task_max_parallelism as default for MapReduce.

  • Raise ValueError for subworkflow in parallel MapReduce steps.

  • Add tests for parallelism validation and subworkflow handling.

  • Refactor and lint agents-api for improved readability.


Changes walkthrough 📝

Relevant files
Enhancement
__init__.py
Default `parallelism` handling in MapReduce steps               

agents-api/agents_api/workflows/task_execution/init.py

  • Set parallelism to task_max_parallelism if not specified.
  • Adjusted logic for handling parallelism in MapReduce steps.
  • Imported task_max_parallelism for use in MapReduce logic.
  • +4/-1     
    Bug fix
    helpers.py
    Validation and constraints for parallel MapReduce               

    agents-api/agents_api/workflows/task_execution/helpers.py

  • Added validation to raise ValueError for subworkflow in parallel
    MapReduce.
  • Ensured parallelism is capped at task_max_parallelism.
  • Added assertion for parallelism to be greater than 1.
  • +5/-0     
    Tests
    test_workflow_helpers.py
    Tests for MapReduce `parallelism` and subworkflow handling

    agents-api/tests/test_workflow_helpers.py

  • Added test for subworkflow handling in parallel MapReduce.
  • Added test to ensure parallelism is greater than 1.
  • Mocked dependencies for testing MapReduce functionality.
  • +119/-0 

    Need help?
  • Type /help how to ... in the comments thread for any questions about Qodo Merge usage.
  • Check out the documentation for more information.
  • Copy link
    Contributor

    PR Reviewer Guide 🔍

    Here are some key observations to aid the review process:

    ⏱️ Estimated effort to review: 2 🔵🔵⚪⚪⚪
    🧪 PR contains tests
    🔒 No security concerns identified
    ⚡ Recommended focus areas for review

    Error Handling

    The error message for subworkflow validation could be more descriptive, explaining why subworkflows are not supported in parallel map reduce and potential alternatives.

    if isinstance(context.current_step.map, YieldStep):
        msg = "Subworkflow step not supported in parallel map reduce"
        raise ValueError(msg)
    Edge Case

    The parallelism validation logic should be consolidated. Currently parallelism=1 is handled differently in init.py vs helpers.py which could lead to inconsistent behavior.

    parallelism = step.parallelism
    if parallelism is None:
        parallelism = task_max_parallelism
    if parallelism == 1:

    Copy link
    Contributor

    PR Code Suggestions ✨

    Explore these optional code suggestions:

    CategorySuggestion                                                                                                                                    Impact
    General
    Validate inputs before processing

    The validation for subworkflow should happen before processing any items or
    setting up parallelism to fail fast and avoid unnecessary setup.

    agents-api/agents_api/workflows/task_execution/helpers.py [275-280]

    +if isinstance(context.current_step.map, YieldStep):
    +    msg = "Subworkflow step not supported in parallel map reduce"
    +    raise ValueError(msg)
    +    
     workflow.logger.info(f"MapReduce step: Processing {len(items)} items")
     results = initial
     
    -if isinstance(context.current_step.map, YieldStep):
    -    msg = "Subworkflow step not supported in parallel map reduce"
    -    raise ValueError(msg)
    -
    • Apply this suggestion
    Suggestion importance[1-10]: 7

    __

    Why: Moving the validation check before any processing or logging improves efficiency by failing fast and prevents unnecessary operations when invalid input is detected.

    Medium
    Optimize execution flow validation

    The parallelism check should be moved before executing map reduce step to avoid
    unnecessary function calls. Move the parallelism validation to the start of the
    function.

    agents-api/agents_api/workflows/task_execution/init.py [308-314]

     if parallelism is None:
         parallelism = task_max_parallelism
     if parallelism == 1:
    -    result = await execute_map_reduce_step(
    +    return await execute_map_reduce_step(
             context=self.context,
             execution_input=self.context.execution_input,
             map_defn=step.map,
    • Apply this suggestion
    Suggestion importance[1-10]: 5

    __

    Why: Adding 'return' statement improves code clarity and efficiency by explicitly returning from the function when parallelism is 1, avoiding unnecessary variable assignment.

    Low

    Copy link
    Contributor

    @ellipsis-dev ellipsis-dev bot left a comment

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    👍 Looks good to me! Reviewed everything up to 92c5228 in 1 minute and 22 seconds

    More details
    • Looked at 171 lines of code in 3 files
    • Skipped 0 files when reviewing.
    • Skipped posting 6 drafted comments based on config settings.
    1. agents-api/agents_api/workflows/task_execution/__init__.py:308
    • Draft comment:
      Nice default update: if parallelism is None, it's now set to task_max_parallelism before checking for 1.
    • Reason this comment was not posted:
      Confidence changes required: 0% <= threshold 50%
      None
    2. agents-api/agents_api/workflows/task_execution/helpers.py:278
    • Draft comment:
      Check added to forbid YieldStep in parallel map-reduce; ensure this limitation is well documented.
    • Reason this comment was not posted:
      Confidence changes required: 0% <= threshold 50%
      None
    3. agents-api/agents_api/workflows/task_execution/__init__.py:307
    • Draft comment:
      When step.parallelism is None, task_max_parallelism is used. Ensure that task_max_parallelism is configured >1; otherwise, the assert in the parallel branch may trigger unexpectedly.
    • Reason this comment was not posted:
      Decided after close inspection that this draft comment was likely wrong and/or not actionable: usefulness confidence = 0% vs. threshold = 50%
      The comment is speculative - it warns about a potential issue if task_max_parallelism is configured incorrectly. However, I don't see any assert in the parallel branch in the code shown. The comment is asking the author to verify/ensure something rather than pointing out a clear issue. This violates our rules about speculative comments and verification requests.
      I could be missing context about where the assert is in the parallel branch code. Maybe there's an important assert in execute_map_reduce_step_parallel that would make this warning valid.
      Even if there is an assert somewhere, the comment is still asking for verification rather than pointing out a clear issue. If there's a real problem, the comment should state it directly rather than asking the author to check.
      Delete this comment. It's speculative and asks for verification rather than pointing out a clear issue. If there's a real problem with task_max_parallelism=1, that should be stated directly.
    4. agents-api/agents_api/workflows/task_execution/helpers.py:278
    • Draft comment:
      Good to see a check for subworkflow steps. Consider adding comments on why YieldStep is not allowed for parallel map reduce.
    • Reason this comment was not posted:
      Confidence changes required: 30% <= threshold 50%
      None
    5. agents-api/agents_api/workflows/task_execution/helpers.py:292
    • Draft comment:
      The reduction expression transformation is a bit opaque. Consider clarifying or refactoring the logic that builds the reducer lambda and then builds the final reduce expression.
    • Reason this comment was not posted:
      Comment was not on a location in the diff, so it can't be submitted as a review comment.
    6. agents-api/tests/test_workflow_helpers.py:110
    • Draft comment:
      Tests correctly check error cases. Consider adding an additional test for when parallelism is not provided to verify that it defaults to task_max_parallelism and triggers parallel execution.
    • Reason this comment was not posted:
      Confidence changes required: 40% <= threshold 50%
      None

    Workflow ID: wflow_5A5LKiNb0d0i4NiY


    You can customize Ellipsis with 👍 / 👎 feedback, review rules, user-specific overrides, quiet mode, and more.

    @creatorrr creatorrr merged commit ce13d55 into dev Feb 14, 2025
    14 checks passed
    @creatorrr creatorrr deleted the f/default-parallelism branch February 14, 2025 16:43
    Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
    Projects
    None yet
    Development

    Successfully merging this pull request may close these issues.

    2 participants