Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(agents-api): properly exit a subworkflow with return step #1186

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from

Conversation

Ahmad-mtos
Copy link
Contributor

@Ahmad-mtos Ahmad-mtos commented Feb 25, 2025

PR Type

Bug fix, Enhancement, Tests


Description

  • Introduced a new WorkflowResult model to encapsulate workflow execution results.

  • Replaced PartialTransition with WorkflowResult across workflow handling methods.

  • Enhanced subworkflow handling to properly exit with return steps.

  • Updated and added tests to validate WorkflowResult integration.


Changes walkthrough 📝

Relevant files
Enhancement
tasks.py
Introduced `WorkflowResult` model for workflow results     

agents-api/agents_api/common/protocol/tasks.py

  • Added WorkflowResult model to represent workflow execution results.
  • Included metadata and return state in WorkflowResult.
  • +7/-0     
    create_task_execution.py
    Added `scope_id` to transition initialization                       

    agents-api/agents_api/routers/tasks/create_task_execution.py

    • Added scope_id to TransitionTarget initialization.
    +1/-0     
    __init__.py
    Refactored workflow execution to use `WorkflowResult`       

    agents-api/agents_api/workflows/task_execution/init.py

  • Replaced PartialTransition with WorkflowResult in step handlers.
  • Enhanced subworkflow return handling with WorkflowResult.
  • Updated run method to handle WorkflowResult transitions.
  • +50/-43 
    helpers.py
    Updated helpers to integrate `WorkflowResult`                       

    agents-api/agents_api/workflows/task_execution/helpers.py

  • Updated helper methods to return WorkflowResult.
  • Added handling for subworkflow returns in foreach and map-reduce
    steps.
  • +22/-6   
    Tests
    test_task_execution_workflow.py
    Updated tests for `WorkflowResult` integration                     

    agents-api/tests/test_task_execution_workflow.py

  • Updated tests to validate WorkflowResult usage.
  • Replaced PartialTransition assertions with WorkflowResult.
  • +14/-13 

    Need help?
  • Type /help how to ... in the comments thread for any questions about Qodo Merge usage.
  • Check out the documentation for more information.

  • Important

    Introduces WorkflowResult to encapsulate workflow execution results, updating step handlers and tests to use this new class.

    • Behavior:
      • Introduces WorkflowResult class in tasks.py to encapsulate workflow execution results, including metadata and a returned flag.
      • Updates _handle_LogStep, _handle_ReturnStep, _handle_SwitchStep, _handle_IfElseWorkflowStep, _handle_ForeachStep, _handle_MapReduceStep, _handle_SleepStep, _handle_EvaluateStep, _handle_YieldStep, _handle_WaitForInputStep, _handle_PromptStep, _handle_SetStep, _handle_GetStep, _handle_ToolCallStep in task_execution/__init__.py to return WorkflowResult.
      • Modifies run() in task_execution/__init__.py to handle WorkflowResult and transition on returned flag.
    • Helpers:
      • Updates execute_foreach_step, execute_map_reduce_step, execute_map_reduce_step_parallel in helpers.py to return WorkflowResult.
    • Tests:
      • Updates tests in test_task_execution_workflow.py to assert WorkflowResult instead of PartialTransition.

    This description was created by Ellipsis for b4f5b24. It will automatically update as commits are pushed.


    EntelligenceAI PR Summary

    Purpose:

    • Enhance task execution workflow by standardizing clarity, consistency, and detail of execution results.

    Changes:

    • Enhancement: Introduced WorkflowResult class to encapsulate workflow execution results, replacing PartialTransition returns.
    • Refactor: Added scope_id to TransitionTarget for improved tracking of workflow execution context.
    • Test: Updated test cases to align with new execution logic, ensuring assertions for WorkflowResult.

    Impact:

    • Improved management of execution metadata and handling of sub-workflow returns, enhancing reliability and maintainability.

    Copy link
    Contributor

    PR Reviewer Guide 🔍

    Here are some key observations to aid the review process:

    ⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
    🧪 PR contains tests
    🔒 No security concerns identified
    ⚡ Recommended focus areas for review

    Error Handling

    The new WorkflowResult error handling logic in the run() method may not properly propagate all error states and metadata when transitioning between workflow steps.

            await transition(context, type="error", output=error)
        msg = f"Step {type(context.current_step).__name__} threw error: {error}"
        raise ApplicationError(msg)
    
    self.outcome = outcome
    try:
        workflow_result = await self.handle_step(
            step=context.current_step,
        )
        state = workflow_result.state
    except BaseException as e:
        while isinstance(e, ActivityError) and getattr(e, "__cause__", None):
            e = e.__cause__
        if isinstance(e, CancelledError | AsyncioCancelledError):
            workflow.logger.info(f"Step {context.cursor.step} cancelled")
            if not getattr(e, "transitioned", False):
                await transition(context, type="cancelled", output="Workflow Cancelled")
            raise
        workflow.logger.error(f"Error in step {context.cursor.step}: {e}")
        if not getattr(e, "transitioned", False):
            await transition(context, type="error", output=str(e))
        msg = f"Step {type(context.current_step).__name__} threw error: {e}"
        raise ApplicationError(msg) from e
    
    if workflow_result.returned:
        await transition(
            self.context,
            output=state.output,
            type="finish" if self.context.is_main else "finish_branch",
            next=None,
            last_error=self.last_error,
        )
        return workflow_result
    Race Condition

    The parallel execution in execute_map_reduce_step_parallel() could have race conditions when checking for returned results across batches.

    if any(batch_result.returned for batch_result in batch_results):
        return next(batch_result for batch_result in batch_results if batch_result.returned)
    
    batch_results = [batch_result.state.output for batch_result in batch_results]
    Type Safety

    The new WorkflowResult model allows arbitrary metadata which could lead to type safety issues when processing workflow results.

    class WorkflowResult(BaseModel):
        """
        Represents the result of a workflow execution, including metadata about how it was completed.
        """
        state: PartialTransition
        returned: bool = False  # True if execution of a sub-workflow ended due to a return statement
        metadata: dict[str, Any] = Field(default_factory=dict)

    Copy link
    Contributor

    PR Code Suggestions ✨

    Explore these optional code suggestions:

    CategorySuggestion                                                                                                                                    Impact
    Possible issue
    Handle batch processing failures gracefully

    The map-reduce step should handle the case where a batch item returns an error.
    Currently if any batch item fails, the entire operation fails. Consider adding
    error handling to collect partial results and continue processing remaining
    items.

    agents-api/agents_api/workflows/task_execution/helpers.py [355-362]

    -# Wait for all the batch items to complete
    -try:
    -    batch_results = await asyncio.gather(*batch_pending)
    +# Wait for all the batch items to complete, handling errors
    +batch_results = []
    +for future in asyncio.as_completed(batch_pending):
    +    try:
    +        result = await future
    +        if result.returned:
    +            return result
    +        batch_results.append(result.state.output)
    +    except Exception as e:
    +        workflow.logger.error(f"Batch item failed: {e}")
    +        continue
     
    -    if any(batch_result.returned for batch_result in batch_results):
    -        return next(batch_result for batch_result in batch_results if batch_result.returned)
    -    
    -    batch_results = [batch_result.state.output for batch_result in batch_results]
    -
    • Apply this suggestion
    Suggestion importance[1-10]: 8

    __

    Why: The suggestion improves error handling in the map-reduce batch processing by allowing partial failures without stopping the entire operation. This makes the code more resilient and prevents complete workflow failures due to individual batch errors.

    Medium
    • More

    Copy link
    Contributor

    CI Feedback 🧐

    A test triggered by this PR failed. Here is an AI-generated analysis of the failure:

    Action: Lint-And-Format

    Failed stage: Run stefanzweifel/git-auto-commit-action@v4 [❌]

    Failure summary:

    The action failed because there are uncommitted local changes in several files that would be
    overwritten by the checkout operation. Specifically:

  • agents-api/agents_api/common/protocol/tasks.py
  • agents-api/agents_api/workflows/task_execution/init.py
  • agents-api/agents_api/workflows/task_execution/helpers.py
  • agents-api/tests/test_task_execution_workflow.py

    These changes need to be either committed or stashed before switching branches.

  • Relevant error logs:
    1:  ##[group]Operating System
    2:  Ubuntu
    ...
    
    1366:  * [new branch]      f/chat-system-tool-calls -> origin/f/chat-system-tool-calls
    1367:  * [new branch]      f/configurable-temporal-search-attribute -> origin/f/configurable-temporal-search-attribute
    1368:  * [new branch]      f/dependency-injection -> origin/f/dependency-injection
    1369:  * [new branch]      f/grafana-traefik      -> origin/f/grafana-traefik
    1370:  * [new branch]      f/increase-test-coverage -> origin/f/increase-test-coverage
    1371:  * [new branch]      f/postgraphile         -> origin/f/postgraphile
    1372:  * [new branch]      f/rag-improv           -> origin/f/rag-improv
    1373:  * [new branch]      f/refactor-tests       -> origin/f/refactor-tests
    1374:  * [new branch]      f/tasks-validation-errors-enhancement -> origin/f/tasks-validation-errors-enhancement
    1375:  * [new branch]      main                   -> origin/main
    1376:  * [new branch]      restore-main           -> origin/restore-main
    1377:  * [new branch]      v0.3                   -> origin/v0.3
    1378:  * [new branch]      x/remove-connection-pool -> origin/x/remove-connection-pool
    1379:  * [new branch]      x/return-step          -> origin/x/return-step
    1380:  * [new tag]         v0.3.4                 -> v0.3.4
    1381:  error: Your local changes to the following files would be overwritten by checkout:
    1382:  agents-api/agents_api/common/protocol/tasks.py
    1383:  agents-api/agents_api/workflows/task_execution/__init__.py
    1384:  agents-api/agents_api/workflows/task_execution/helpers.py
    1385:  agents-api/tests/test_task_execution_workflow.py
    1386:  Please commit your changes or stash them before you switch branches.
    1387:  Aborting
    1388:  Error: Invalid status code: 1
    1389:  at ChildProcess.<anonymous> (/home/runner/work/_actions/stefanzweifel/git-auto-commit-action/v4/index.js:17:19)
    1390:  at ChildProcess.emit (node:events:519:28)
    1391:  at maybeClose (node:internal/child_process:1105:16)
    1392:  at ChildProcess._handle.onexit (node:internal/child_process:305:5) {
    1393:  code: 1
    1394:  }
    1395:  Error: Invalid status code: 1
    

    Copy link
    Contributor

    Walkthrough

    This PR enhances the task execution workflow by introducing a WorkflowResult class, replacing PartialTransition returns across various components. This change ensures a consistent and informative result structure, improving the workflow's ability to manage execution metadata and handle sub-workflow returns effectively. Additionally, test cases have been updated to align with the new workflow execution logic.

    Changes

    File(s) Summary
    agents-api/agents_api/common/protocol/tasks.py Introduced WorkflowResult class to encapsulate workflow execution results, including metadata and return status.
    agents-api/agents_api/routers/tasks/create_task_execution.py Added scope_id to TransitionTarget for better tracking of workflow execution context.
    agents-api/agents_api/workflows/task_execution/__init__.py Replaced PartialTransition returns with WorkflowResult in step handlers to include execution metadata and return status.
    agents-api/agents_api/workflows/task_execution/helpers.py Updated helper functions to return WorkflowResult instead of PartialTransition, ensuring consistent result handling.
    agents-api/tests/test_task_execution_workflow.py Modified test cases to assert WorkflowResult instead of PartialTransition, aligning with the updated workflow execution logic.
    Entelligence.ai can learn from your feedback. Simply add 👍 / 👎 emojis to teach it your preferences. More shortcuts below

    Emoji Descriptions:

    • ⚠️ Potential Issue - May require further investigation.
    • 🔒 Security Vulnerability - Fix to ensure system safety.
    • 💻 Code Improvement - Suggestions to enhance code quality.
    • 🔨 Refactor Suggestion - Recommendations for restructuring code.
    • ℹ️ Others - General comments and information.

    Interact with the Bot:

    • Send a message or request using the format:
      @bot + *your message*
    Example: @bot Can you suggest improvements for this code?
    
    • Help the Bot learn by providing feedback on its responses.
      @bot + *feedback*
    Example: @bot Do not comment on `save_auth` function !
    

    Comment on lines 285 to 287

    return result

    return WorkflowResult(state=PartialTransition(output=result))
    Copy link
    Contributor

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    The execute_map_reduce_step function has an empty line before the return statement that could lead to unreachable code and incorrect results being returned.

    📝 Committable Code Suggestion

    ‼️ Ensure you review the code suggestion before committing it to the branch. Make sure it replaces the highlighted code, contains no missing lines, and has no issues with indentation.

    Suggested change
    return result
    return WorkflowResult(state=PartialTransition(output=result))
    return WorkflowResult(state=PartialTransition(output=result))

    Copy link
    Contributor

    @ellipsis-dev ellipsis-dev bot left a comment

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    👍 Looks good to me! Reviewed everything up to b4f5b24 in 2 minutes and 17 seconds

    More details
    • Looked at 567 lines of code in 5 files
    • Skipped 0 files when reviewing.
    • Skipped posting 17 drafted comments based on config settings.
    1. agents-api/agents_api/common/protocol/tasks.py:139
    • Draft comment:
      Introduces WorkflowResult with a concise docstring. Consider adding more detailed type hints or examples if future extensions are expected.
    • Reason this comment was not posted:
      Decided after close inspection that this draft comment was likely wrong and/or not actionable: usefulness confidence = 10% vs. threshold = 50%
      The existing type hints are already quite specific - state is a PartialTransition, returned is a bool with a clear purpose, and metadata is a flexible dict[str, Any]. The comment doesn't point to any specific field that needs better typing. It's a speculative suggestion about "future extensions" without identifying a concrete issue.
      Perhaps there could be specific use cases where more detailed typing of the metadata dictionary would be valuable. Maybe the comment author saw patterns in how metadata is used elsewhere in the codebase.
      Without specific evidence of how the metadata is used or what future extensions are anticipated, this is just speculation. The current typing is appropriate for the current implementation.
      The comment should be deleted as it's speculative and doesn't identify a concrete issue with the current implementation. The existing type hints are appropriate and clear.
    2. agents-api/agents_api/routers/tasks/create_task_execution.py:77
    • Draft comment:
      Adding 'scope_id=uuid7()' in TransitionTarget improves uniqueness. Verify that uuid7 meets the deterministic needs (if any) in workflow transitions.
    • Reason this comment was not posted:
      Comment was not on a location in the diff, so it can't be submitted as a review comment.
    3. agents-api/agents_api/workflows/task_execution/__init__.py:239
    • Draft comment:
      In _handle_ReturnStep the workflow now returns a WorkflowResult with returned=True. Confirm that calling transition before returning is safe here; the new implementation changes the control flow.
    • Reason this comment was not posted:
      Comment did not seem useful. Confidence is useful = 0% <= threshold 50%
      The comment is asking the PR author to confirm the safety of a change in control flow, which violates the rule against asking for confirmation of intentions or safety. It does not provide a specific suggestion or ask for a test to be written. Therefore, this comment should be removed.
    4. agents-api/agents_api/workflows/task_execution/__init__.py:658
    • Draft comment:
      The handle_step method now explicitly returns a WorkflowResult. This clearer typing is a best practice; ensure all calling methods properly unwrap the 'state' field as needed.
    • Reason this comment was not posted:
      Comment did not seem useful. Confidence is useful = 0% <= threshold 50%
      The comment is asking the author to ensure that all calling methods properly unwrap a field. This is a general request for verification and does not provide a specific suggestion or point out a specific issue in the code. It violates the rule against asking the author to ensure behavior is intended or tested.
    5. agents-api/agents_api/workflows/task_execution/__init__.py:790
    • Draft comment:
      After checking if workflow_result.returned is true, a transition is executed before returning. Confirm that this transition is intended to be executed synchronously and that its side effects are expected.
    • Reason this comment was not posted:
      Comment did not seem useful. Confidence is useful = 0% <= threshold 50%
      The comment is asking the PR author to confirm their intention regarding the execution of a transition. This violates the rule against asking the author to confirm their intention or ensure behavior is intended. The comment does not provide a specific code suggestion or point out a clear issue.
    6. agents-api/agents_api/workflows/task_execution/helpers.py:227
    • Draft comment:
      The execute_foreach_step function now checks for 'result.returned' in the loop. Ensure that returning early from a foreach iteration correctly propagates the WorkflowResult upstream.
    • Reason this comment was not posted:
      Comment did not seem useful. Confidence is useful = 0% <= threshold 50%
      The comment is asking the author to ensure that a specific behavior is correct, which violates the rule against asking the author to ensure behavior is intended. It doesn't provide a specific suggestion or point out a clear issue.
    7. agents-api/tests/test_task_execution_workflow.py:100
    • Draft comment:
      Test assertions now expect a WorkflowResult instead of a PartialTransition. This is good for clear contract testing; verify that all tests consistently check both state and returned flag.
    • Reason this comment was not posted:
      Comment did not seem useful. Confidence is useful = 0% <= threshold 50%
      This comment is asking the PR author to verify that all tests are consistent with the new expectations. It doesn't provide a specific suggestion or point out a specific issue, which violates the rule against asking the author to ensure things are tested or verified.
    8. agents-api/tests/test_task_execution_workflow.py:667
    • Draft comment:
      Test for yield step expressions checks transition_to tuple with generated scope_id. Consider documenting the expected behavior for yield steps, since the scope_id is generated dynamically.
    • Reason this comment was not posted:
      Comment was not on a location in the diff, so it can't be submitted as a review comment.
    9. agents-api/tests/test_task_execution_workflow.py:1417
    • Draft comment:
      The tool call evaluation for function tool now includes a generated call ID ('XXXX') in the output. Ensure this mechanism is robust and that the test accounts for any future changes in call ID generation.
    • Reason this comment was not posted:
      Comment was not on a location in the diff, so it can't be submitted as a review comment.
    10. agents-api/tests/test_task_execution_workflow.py:100
    • Draft comment:
      Instead of asserting full equality with a WorkflowResult, consider comparing only its essential fields (like output and type). This makes the test less brittle if additional metadata is added later.
    • Reason this comment was not posted:
      Decided after close inspection that this draft comment was likely wrong and/or not actionable: usefulness confidence = 30% vs. threshold = 50%
      This is a reasonable suggestion about test brittleness - comparing only essential fields can make tests more maintainable. However, in this case, WorkflowResult seems to be a simple wrapper with just a state field, and the test is explicitly verifying the exact structure that should be returned. Making the assertion more granular could actually make the test less clear about the expected return type.
      The comment has a valid point about test brittleness. If WorkflowResult gains more fields in the future, tests would need updates even if the core behavior hasn't changed.
      While the brittleness concern is valid, the current assertion clearly shows the exact expected return type and structure, which is valuable for test readability and documentation. The suggested change could obscure the fact that a WorkflowResult is expected.
      The comment should be deleted. While it makes a reasonable suggestion, the current assertion style has benefits in terms of clarity and documentation that outweigh the brittleness concerns.
    11. agents-api/tests/test_task_execution_workflow.py:217
    • Draft comment:
      Use substring or regex matching for the exception message instead of checking full equality. This avoids brittleness if the formatting or additional context is added in the error message.
    • Reason this comment was not posted:
      Comment was not on a location in the diff, so it can't be submitted as a review comment.
    12. agents-api/tests/test_task_execution_workflow.py:672
    • Draft comment:
      When using multiple side effects with workflow.execute_activity, ensure the order and expected calls are documented. A change in call order may break the test unexpectedly.
    • Reason this comment was not posted:
      Comment did not seem useful. Confidence is useful = 0% <= threshold 50%
      This comment is asking the PR author to ensure that the order and expected calls are documented, which is similar to asking them to double-check or ensure something. This violates the rule against asking the author to ensure behavior is intended or tested. The comment does not provide a specific code suggestion or ask for a specific test to be written, which would have been acceptable.
    13. agents-api/tests/test_task_execution_workflow.py:1267
    • Draft comment:
      The log step test checks the output of a log expression. Consider adding a scenario with multiple log messages or edge cases to better verify the aggregation behavior if it's expected to evolve.
    • Reason this comment was not posted:
      Confidence changes required: 33% <= threshold 50%
      None
    14. agents-api/tests/test_task_execution_workflow.py:1425
    • Draft comment:
      The test for evaluating tool call expressions depends on generate_call_id returning a fixed value ('XXXX'). Consider abstracting or injecting the call ID generator to avoid tight coupling between tests and implementation.
    • Reason this comment was not posted:
      Comment was not on a location in the diff, so it can't be submitted as a review comment.
    15. agents-api/tests/test_task_execution_workflow.py:1500
    • Draft comment:
      In the yield expressions test, the assertion extracts the dynamic scope_id from the transition target. Consider isolating dynamic components in the assertion so that changes in runtime-generated values don't break the test unnecessarily.
    • Reason this comment was not posted:
      Comment was not on a location in the diff, so it can't be submitted as a review comment.
    16. agents-api/tests/test_task_execution_workflow.py:1514
    • Draft comment:
      Overall, the test suite provides comprehensive coverage of the workflow step handling. As the workflow complexity grows (e.g., with nested foreach or map-reduce steps), consider adding integration tests to ensure long-term maintainability.
    • Reason this comment was not posted:
      Confidence changes required: 33% <= threshold 50%
      None
    17. agents-api/agents_api/workflows/task_execution/helpers.py:123
    • Draft comment:
      Typo: The variable 'seprated_workflow_name' should be spelled 'separated_workflow_name' for consistency and clarity.
    • Reason this comment was not posted:
      Comment was not on a location in the diff, so it can't be submitted as a review comment.

    Workflow ID: wflow_I5YId3Oq3nJNKWmE


    You can customize Ellipsis with 👍 / 👎 feedback, review rules, user-specific overrides, quiet mode, and more.

    batch_result for batch_result in batch_results if batch_result.returned
    )

    batch_results = [batch_result.state.output for batch_result in batch_results]
    Copy link
    Contributor

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    can this be refactored somehow to not traverse batch_results twice?

    Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
    Projects
    None yet
    Development

    Successfully merging this pull request may close these issues.

    2 participants