Skip to content

[RFC] - Simplifying Workflow Design with AG2's High-Level API #1035

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from

Conversation

davorrunje
Copy link
Collaborator

@davorrunje davorrunje commented Feb 19, 2025

This PR proposes a new global run function to be used for running workflows and explains how it can be used to integrate with console and web applications using both sync and async API-s.

Motivational Examples

1. WebSocket-Based Workflow

Let's first consider the WebSocket example, where agents work together in a dynamic, interactive environment. The WebSocket server handles asynchronous communication between the agents and a client, allowing real-time interaction.

WebSocket Server Example with websockets Library

import asyncio
import websockets
import json
from typing import Any
from autogen import ConversableAgent, LLMConfig
from autogen.tools import tool

# Set up LLMConfig as a context manager for agent configuration
llm_config = LLMConfig({"config_list": [{"api_key": "your-api-key", "model": "gpt-4o-mini"}]})

# Define tools for the agents
@tool(description="Search Wikipedia for the given query.")
def search_wikipedia(query: str) -> str:
    return "Leonardo da Vinci was an Italian polymath."

@tool(description="Send the answer to the user.")
def send_answer(answer: str) -> str:
    return f"Answer sent: {answer}"

async def run_agents_and_process_events():
    with llm_config:
        alice = ConversableAgent(system_message="You are a helpful assistant.")
        bob = ConversableAgent(system_message="You are a fact-checker.")

    alice.add_tool(search_wikipedia)
    bob.add_tool(send_answer)

    response = await a_run(alice, bob, message="Who is Leonardo da Vinci?")
    return response

async def process_event_loop(response: AsyncRunResponseProtocol, websocket: websockets.WebSocketServerProtocol):
    async for event in response.events:
        if event.type == "input_request":
            await websocket.send(json.dumps({"type": "input_request", "prompt": event.prompt}))
            user_input = await websocket.recv()
            event.respond(InputResponseEvent(value=user_input))

        elif event.type == "output":
            await websocket.send(json.dumps({"type": "output", "value": event.value}))

        elif event.type == "agent_message":
            await websocket.send(json.dumps({"type": "agent_message", "message": event.message}))

        elif event.type == "system":
            await websocket.send(json.dumps({"type": "system", "value": event.value}))

        elif event.type == "error":
            await websocket.send(json.dumps({"type": "error", "error": event.error}))

async def handler(websocket: websockets.WebSocketServerProtocol, path: str):
    response = await run_agents_and_process_events()
    await process_event_loop(response, websocket)

async def main():
    server = await websockets.serve(handler, "localhost", 8765)
    print("WebSocket server started on ws://localhost:8765")
    await server.wait_closed()

asyncio.run(main())

Explanation:

  • The WebSocket server listens for incoming client connections and processes events from agents Alice and Bob.
  • Events are processed asynchronously, with input requests sent to the client, where the client responds with input. The server then sends back output, agent messages, and system messages as they occur in the workflow.

2. Synchronous CLI Application Using typer

The second example demonstrates a synchronous version of the same workflow, but in a CLI application built using the typer library.

CLI Application Code with typer and Synchronous Event Handling

import typer
from os import environ
from autogen import ConversableAgent, LLMConfig
from autogen.tools import tool

# Set up the typer app
app = typer.Typer()

# Set up LLMConfig as a context manager for agent configuration
llm_config = LLMConfig({"config_list": [{"api_key": environ["OPENAI_API_KEY"], "model": "gpt-4o-mini"}]})

# Define tools for the agents
@tool(description="Search Wikipedia for the given query.")
def search_wikipedia(query: str) -> str:
    return "Leonardo da Vinci was an Italian polymath."

@tool(description="Send the answer to the user.")
def send_answer(answer: str) -> str:
    return f"Answer sent: {answer}"

def run_agents_and_process_events():
    with llm_config:
        alice = ConversableAgent(system_message="You are a helpful assistant.")
        bob = ConversableAgent(system_message="You are a fact-checker.")

    alice.add_tool(search_wikipedia)
    bob.add_tool(send_answer)

    response = run(alice, bob, message="Who is Leonardo da Vinci?")
    return response

def process_event_loop(response):
    s = None
    for event in response.events:
        try:
            if event.type == "input_request":
                print(f"Input Request: {event.prompt}")
                s = input("Your response: ")
                event.respond(InputResponseEvent(value=s))

            elif event.type == "input_response":
                print(f"Input Response: {event.value}")

            elif event.type == "output":
                print(f"Output: {event.value}")

            elif event.type == "agent_message":
                print(f"Agent Message: {event.message}")

            elif event.type == "system":
                print(f"System: {event.value}")

            elif event.type == "error":
                print(f"Error occurred: {event.error}")

            else:
                print(f"Unrecognized event type: {event.type}")

        except Exception as e:
            print(f"Error processing event {event}: {str(e)}")

@app.command()
def run_workflow():
    response = run_agents_and_process_events()
    process_event_loop(response)

if __name__ == "__main__":
    app()

Explanation:

  • The typer library is used to build a simple CLI that prompts the user for input and displays agent messages.
  • This is a synchronous process where the program waits for user input before continuing the workflow and handling the next event.
  • The program processes each event step-by-step in the console, making it easy for developers to create interactive workflows in a synchronous environment.

3. Group Chat Manager for Handling Transitions Between Multiple Agents

Example Code with GroupChatManager:

from autogen import ConversableAgent, LLMConfig
from autogen.tools import tool
from autogen.chat_manager import GroupChatManager, Keyword

# Set up LLMConfig for agent configuration
llm_config = LLMConfig({"api_type": "openai", "model": "gpt-4o-mini"})

# Define a tool to submit the plan
@tool
def submit_plan(plan: str) -> str:
    return f"Plan submitted: {plan}"

# Create the agents for the workflow
with llm_config:
    planner = ConversableAgent("You are a planner. Collaborate with teacher and reviewer to create lesson plans.")

    reviewer = ConversableAgent("You are a reviewer. Review lesson plans against 4th grade curriculum. Provide max 3 changes.")

    teacher = ConversableAgent(
        "You are a teacher. Choose topics and work with planner and reviewer. Say DONE! when finished.",
        tools=[submit_plan]
    )

    # Set up the Group Chat Manager with a termination condition (when teacher says DONE!)
    chat_manager = GroupChatManager(terminate_on=Keyword("DONE!"))

# Run the workflow with the three agents and the group chat manager
response = run(
    planner, reviewer, teacher,
    message="Create lesson plans for 4th grade.",
    chat_manager=chat_manager,
)

# Process the events generated during the workflow
def process_event_loop(response):
    s = None
    for event in response.events:
        try:
            if event.type == "input_request":
                print(f"Input Request: {event.prompt}")
                s = input("Your response: ")
                event.respond(InputResponseEvent(value=s))  # Responding to the request

            elif event.type == "input_response":
                print(f"Input Response: {event.value}")

            elif event.type == "output":
                print(f"Output: {event.value}")

            elif event.type == "agent_message":
                print(f"Agent Message: {event.message}")

            elif event.type == "system":
                print(f"System: {event.value}")

            elif event.type == "error":
                print(f"Error occurred: {event.error}")

            else:
                print(f"Unrecognized event type: {event.type}")

        except Exception as e:
            print(f"Error processing event: {str(e)}")

# Start the workflow and process events
process_event_loop(response)

Explanation:

  1. Setting Up the Agents:
    • Planner: The planner is responsible for collaborating with the teacher and reviewer to create the lesson plan.
    • Reviewer: The reviewer checks the lesson plan against a set curriculum and provides suggested changes (up to 3).
    • Teacher: The teacher selects topics and works with the planner and reviewer. Once the lesson plan is ready, the teacher will signal completion by saying "DONE!".
  2. The GroupChatManager:
    • The GroupChatManager is used to manage the transitions between agents in this group workflow. It listens for the Keyword("DONE!") from the teacher to terminate the workflow. This ensures that the process stops once the teacher is satisfied with the lesson plan.
  3. Event Handling:
    • The workflow progresses by processing events such as input requests, input responses, output, agent messages, and system messages. These events trigger interactions between the agents, with the teacher eventually signaling the end of the workflow by saying "DONE!".
  4. Run and Workflow Execution:
    • The run() function starts the workflow, passing the three agents (planner, reviewer, teacher) and the chat_manager. The agents communicate through the chat_manager, ensuring that transitions between them are handled automatically. The workflow will stop when the teacher says "DONE!".
  5. Event Processing:
    • The process_event_loop function handles each event generated during the workflow. For example, if an input request event is triggered, the program prompts the user for input and responds to the agent with the provided answer.

How It Works:

  1. The planner and teacher begin the lesson planning process.
  2. The reviewer provides feedback on the plan.
  3. The teacher interacts with both the planner and reviewer, making changes and finalizing the plan.
  4. The teacher signals the completion of the workflow by saying "DONE!".
  5. The Group Chat Manager ensures that the workflow ends when the teacher finishes, and the process terminates cleanly.

@davorrunje davorrunje marked this pull request as draft February 19, 2025 18:30
@davorrunje davorrunje self-assigned this Feb 20, 2025
@davorrunje davorrunje added the RFC label Feb 21, 2025
@davorrunje davorrunje requested a review from marklysze February 21, 2025 15:15
@shawn-yang-google
Copy link

shawn-yang-google commented Feb 21, 2025

Thanks for Chi's invitation to the RFC. It is a great work!

My thoughts:

  1. Here the run covers the two agents and group agents scenarios.
    Can it also cover other conversational patterns like sequential and nested chat?

response = await a_run(alice, bob, message="Who is Leonardo da Vinci?")

  1. Does the order of agents args matter?
    In Agent_A.initiate_chat(recipient=Agent_B), we actually define the start point of the agents coversation.
    Does run or a_run use the first agent as the start point?

@marklysze
Copy link
Collaborator

Nice work @davorrunje, @sternakt, @davorinrusevljan!

I really like the simplicity and logical approach to agent creation, tools, events, run response protocol, and the run command. The ability to iterate over the events and how this can drive any user interface is a major step forward.

After today's discussion, most of my questions were answered and for others to reference, here are some questions and my understanding of the answers.

Q: How will nested chats and other internal chats be catered for
A: Chats within a run will be new runs with a new iterator (with new thread ids). Events will have associations with previous event(s) and a run (this will help with, for example, representing the flow diagrammatically or indenting inner chat activity in the console for nested/inner chats). Runs will have an association with parent runs.

Q: I see GroupChat is catered for with the NewGroupChatManager, how will other orchestrations be catered for?
A: The chat_manager property (which is likely to be changed to something likeorchestration, or perhaps pattern) will take in an orchestration-based object which will be used to determine the next agent to speak.

Q: Is the ConversableAgent still used and functions as it currently does (e.g. internal generate_reply with registered reply functions)
A: Yes all ConversableAgent based agents can be used
A: In future iterations, a new, simpler, Agent class will also be supported, as well as the ability to wrap and use 3rd party agents.

Q: How are exceptions handled, are they the event.type == "error"?
A: Internally exceptions could be converted to error types or they could bubble up.

Q: How would a workflow be saved and resumed?
A: Importantly all key objects are serialisable, so the state can be persisted. To resume a run, a previous run's response object can be passed into a run and it will resume from that point onward.

Q: How will tools be executed
A: A user proxy agent will be created in the run to execute tools. If an agent suggests a tool, it will automatically use this generated agent to execute the tool and then the flow will continue to the next determined agent. In this way this generated user proxy agent will be transparent.

Q: How do you pass multiple agents to the run function
A: The run function takes a series of agents as its starting parameters. See example of run's signature:

def run(
    *agents: Agent,
    message: Optional[str] = None,
    previous_run: Optional[RunResponseProtocol] = None,
    chat_manager: Optional[ChatManagerProtocol] = None,
    **kwargs: Any
) -> RunResponseProtocol:

Q: Will you have to implement the event processing loop each time?
A: No, a series of these will be created and avaliable from the library, e.g. for console or for websocket

Q: There's only one parameter set on ConversableAgent in the sample code.
A: The aim is to minimise the lines of code to the most important settings. The name of an agent can be determined and set programmatically, for example.

Q: Is the messages list still being used?
A: Yes, for this iteration it will be as is with the messages in memory. Future iterations may use a message broker that will support distribution/scalability.

Q: Termination not using a lambda/function?
A: Yes, this is important for serialisation, so in the example terminate_on will take serialisable objects and these can be amalgamated (e.g. you could terminate on multiple conditions).

Q: Why is LLM Configuration being done this way?
A: So it is serialisable. Additionally, as it contains sensitive information it will need to have the ability to not have that information persisted (TBD but will be handled).

Q: What if you don't specify a conversation pattern/orchestration?
A: It will be a round robin

@marklysze
Copy link
Collaborator

... and here are some other questions:

Q: Is there a way to trigger the workflow based on an event (this may be outside the scope of this implementation), e.g. a workflow triggers when a new file is added to a directory or a new post on Discord.

Q: Is it possible to have asynchronous activity within the workflow (e.g. currently nested chats can be run asynchronously so multiple chats happen concurrently)? I assume this just means multiple runs and iterators, but good to understand.

Q: Can a run be terminated within, for example, an agent? Or what determines the end of the run?

Q: Assume multiple tool calls (e.g. an LLM recommends multiple tool calls) work as usual?

@AgentGenie
Copy link
Collaborator

I would like to understand this better. @davorrunje Could you help clarify?

  1. Is this about running the workflow from remote? But not agent communication in a distributed way, right?
  2. Would this design support stream of the event instead of wait on all events to complete?
  3. Would this support human-in-loop? e.g. when user sends additional message from client side, on server side, would the workflow continue or it just starts a new workflow?

@sonichi
Copy link
Collaborator

sonichi commented Mar 17, 2025

I'd like the signature and docstr of run to be refined so that all the orchestration patterns can be supported. For example, we should allow a "swarm-enabled" GroupChatManager to be auto-created, even though we may want to remove the "swarm" terminology for simplicity.
@marklysze could you summarize what are done differently in a swarm group chat vs. non-swarm group chat and see if we can unify them in a single run API?

Besides the swarm group chat, another question is how do we support sequential chats in run?

@marklysze
Copy link
Collaborator

marklysze commented Mar 19, 2025

Differences:
 
Group Chat:
 

  1. Has speaker selection modes:
  • Auto
  • Round Robin
  • Random
  • Manual
  • [Callable]
     
  1. Can specify allowed/disallowed transitions
     
  2. Requires a GroupChatManager to be passed in (Swarm you can pass the arguments to create one)
     
  3. Starts with initiate_chat and GroupChatManager as the recipient
     
  4. Has an internal automatic speaker selection chat
     

Swarm:
 

  1. Shared Context Variables
     
  2. Starts with initiate_swarm_chat and an initial recipient (internally uses initiate_chat between a user agent and a GroupChatManager)
     
  3. Automatic function calling with internally created tool executor agent
     
  4. Tool calls can return a SwarmResult which can determine the next agent
     
  5. Hand-offs with OnConditions, OnContextCondition, and AfterWorks
     
  6. Can pass in a user agent otherwise it will create one
     
  7. AfterWork of SWARM_MANAGER resorts to a GroupChat with speaker selection of 'auto'
     
  8. Tools have context_variables parameter injected if it exists

@shawn-yang-google
Copy link

shawn-yang-google commented Mar 19, 2025

Expanding on my previous comment here: #1035 (comment)

My understanding

The current Run Interface in this documentation appears to support:

run(alice, bob, message="Who is Leonardo da Vinci?")

Which is equivalent to:

alice.initiate_chat(
    recipient=bob,
    message="Who is Leonardo da Vinci?",
)
run(
    planner, reviewer, teacher,
    message="Create lesson plans for 4th grade.",
    chat_manager=chat_manager,
)

Which is equivalent to:

chat_manager = GroupChatManager(
    groupchat=GroupChat(
        agents=[planner, reviewer, teacher,],
    )
)
planner.initiate_chat(
    group_chat_manager,
    message="Create lesson plans for 4th grade.",
)

Regarding my previous question 2:

"Does the order of agent arguments matter?"

It seems logical that the first agent listed is intended to initiate_chat.

Extending the Run Interface to support other Conversation Patterns:

  • Sequential Chat
    Sequential Chat also enables users to define a group of agents, similar to the Group Chat case.
    However, the key difference is that it requires users to define a list of messages and keyword arguments (kwargs) for the initiate_chats method (e.g., "summary_method").
planner.initiate_chats(
    [
        {
            "recipient": reviewer,
            "message": ...,
            "summary_method":...,
        },
        {
            "recipient": teacher,
            "message": ...,
            "summary_method":...,
        },
    ]
)

Considering the current Run interface design:

def run(
    *agents: Agent,
    message: Optional[str] = None,
    previous_run: Optional[RunResponseProtocol] = None,
    chat_manager: Optional[ChatManagerProtocol] = None,
    **kwargs: Any
) -> RunResponseProtocol:

A straightforward solution might be to introduce a new optional argument like initial_chats_kwargs. For instance:

def run(
    *agents: Agent,
    message: Optional[str] = None,
    previous_run: Optional[RunResponseProtocol] = None,
    chat_manager: Optional[ChatManagerProtocol] = None,
    initial_chats_kwargs,
    **kwargs: Any
) -> RunResponseProtocol:

Users could then utilize it as follows:

run(
    planner,
    initial_chats_kwargs = {
        [
            {
                "recipient": reviewer,
                "message": ...,
                ...
            },
            {
                "recipient": teacher,
                "message": ...,
                ...
            },
        ]
    }
)

If we want to maintain the serializability of initial_chats_kwargs (i.e., avoid including agent instances directly), another approach could be to reuse the *agents argument:

run(
    planner, reviewer, teacher
    initial_chats_kwargs = {
        [
            {
                "message": ...,
                ...
            },
            {
                "message": ...,
                ...
            },
        ]
    },
)

Here, planner (the first agent) would initiate the chats.
The first message in initial_chats_kwargs would be associated with reviewer (the second agent in the agents list), and the second message would be linked to teacher (the third agent).
However, this approach might be less intuitive for users.

With the addition of this new argument, we could also potentially support group-chat-in-a-sequential-chat:

run(
    planner,
    initial_chats_kwargs = {
        [
            {
                "recipient": group_chat_manager,
                "message": ...,
                ...
            },
            {
                "recipient": group_chat_manager,
                "message": ...,
                ...
            },
        ]
    }
)

Nested Chat (see documentation) is like Sequential Chat but utilizes the register_nested_chats API and a trigger argument. You define recipient Agents and keyword arguments for initiate_chat just as you would for Sequential Chat. The key difference is the trigger.

Here's an example of registering nested chats:

planner.register_nested_chats(
    [
        {
            "recipient": reviewer,
            "message": ...,
        },
        {
            "recipient": teacher,
            "message": ...,
            ...
        },
    ],
    trigger=lambda sender: sender not in [reviewer, teacher],
)

To use a trigger and create a nested chat, you would include it in the run function like this:

run(
    planner,
    initial_chats_kwargs = {
        [
            {
                "recipient": reviewer,
                "message": ...,
                ...
            },
            {
                "recipient": teacher,
                "message": ...,
                ...
            },
        ]
    },
    trigger = lambda sender: sender not in [reviewer, teacher],
)

Essentially, the presence of a trigger makes it a Nested Chat; its absence signifies a Sequential Chat.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants