Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create Haystack and Langchain Nodes #1420

Open
wants to merge 25 commits into
base: branch-24.06
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
8d2ca24
Added haystack llm orch to agents pipeline
bsuryadevara Dec 11, 2023
81a0e7f
Trivial changes
bsuryadevara Dec 11, 2023
a7ab077
Trivial changes
bsuryadevara Dec 11, 2023
96d3887
Updated haystack calculator node
bsuryadevara Dec 11, 2023
6247ea3
Updated haystack calculator node
bsuryadevara Dec 11, 2023
5034a90
Added haystack llm agent
bsuryadevara Dec 11, 2023
59f0056
Merge branch 'branch-24.03' into create-haystack-node
bsuryadevara Dec 11, 2023
1820796
Added tests to haystack agent node
bsuryadevara Dec 12, 2023
e28caf5
Merge branch 'create-haystack-node' of github.com:bsuryadevara/Morphe…
bsuryadevara Dec 12, 2023
dbc03dc
Updated agents kafka pipeline
bsuryadevara Dec 13, 2023
28e24db
Merge remote-tracking branch 'upstream/branch-24.03' into create-hays…
bsuryadevara Dec 13, 2023
ef223e8
Removed haystack full package installation
bsuryadevara Dec 13, 2023
c17dbef
Removed haystack full package installation
bsuryadevara Dec 13, 2023
01ff40b
Removed haystack dependency from dev yaml
bsuryadevara Dec 13, 2023
067b15e
Merge branch 'branch-24.03' into create-haystack-node
bsuryadevara Dec 18, 2023
40178f8
Added llamaindex agent functionality
bsuryadevara Dec 20, 2023
38bb8fa
Merge branch 'branch-24.03' into create-haystack-node
bsuryadevara Dec 20, 2023
ea57b44
Updated openai chat service tests
bsuryadevara Dec 26, 2023
47abbbb
Updated openai client tests
bsuryadevara Jan 2, 2024
a95e22c
Merge branch 'branch-24.03' into create-haystack-node
bsuryadevara Jan 2, 2024
4dfd04a
Updated copyright header
bsuryadevara Jan 3, 2024
f9ad5fa
Merge branch 'create-haystack-node' of github.com:bsuryadevara/Morphe…
bsuryadevara Jan 3, 2024
e3af515
Updated copyright header
bsuryadevara Jan 3, 2024
211d66f
Fixed flake8 errors with openai module
bsuryadevara Jan 3, 2024
363e2b9
Fixed flake8 errors with openai module
bsuryadevara Jan 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docker/conda/environments/cuda11.8_dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ dependencies:
- pip:
# Add additional dev dependencies here
- databricks-connect
- farm-haystack==1.22.1
bsuryadevara marked this conversation as resolved.
Show resolved Hide resolved
- milvus==2.3.2
- pyarrow_hotfix # CVE-2023-47248. See morpheus/__init__.py for more details
- pymilvus==2.3.2
Expand Down
1 change: 1 addition & 0 deletions docker/conda/environments/cuda11.8_examples.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ dependencies:

####### Pip Dependencies (keep sorted!) #######
- pip:
- farm-haystack==1.22.1
- google-search-results==2.4
- grpcio-status==1.58 # To keep in sync with 1.58 grpcio which is installed for Morpheus
- nemollm
24 changes: 23 additions & 1 deletion examples/llm/agents/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ The pipeline supports different agent types, each influencing the pattern for in
Depending on the problem at hand, various tools can be provided to LLM agents, such as internet searches, VDB retrievers, calculators, Wikipedia, etc. In this example, we'll use the internet search tool and an llm-math tool, allowing the LLM agent to perform Google searches and solve math equations.

### LLM Library
The pipeline utilizes the Langchain library to run LLM agents, enabling their execution directly within a Morpheus pipeline. This approach reduces the overhead of migrating existing systems to Morpheus and eliminates the need to replicate work done by popular LLM libraries like llama-index and Haystack.
The pipeline utilizes the Langchain, Haystack library to run LLM agents, enabling their execution directly within a Morpheus pipeline. This approach reduces the overhead of migrating existing systems to Morpheus and eliminates the need to replicate work done by popular LLM libraries like llama-index and Haystack.

## Pipeline Implementation
- **InMemorySourceStage**: Manages LLM queries in a DataFrame.
Expand Down Expand Up @@ -90,6 +90,17 @@ SerpApi API key. Set the API key as an environment variable using the following
export SERPAPI_API_KEY="<YOUR_SERPAPI_API_KEY>"
```

**Serper Dev API Key**

Go to [SerperDev](https://serper.dev/login) to register and create an account. Once registered, obtain your
Serper Dev API key. Set the API key as an environment variable using the following command:

```bash
export SERPERDEV_API_KEY="<YOUR_SERPERDEV_API_KEY>"
```

Note: This is required when using the Haystack LLM orchestration framework in the pipeline.

#### Install Dependencies

Install the required dependencies.
Expand Down Expand Up @@ -144,6 +155,12 @@ python exmaples/llm/main.py agents simple [OPTIONS]
- `--repeat_count INTEGER RANGE`
- **Description**: Number of times to repeat the input query. Useful for testing performance.
- **Default**: `1`

- `--llm_orch TEXT`
- **Chioce**: `[haystack|langchain]`
- **Description**: The name of the model to use in OpenAI.
- **Default**: `langchain`

- `--help`
- **Description**: Show the help message with options and commands details.

Expand Down Expand Up @@ -193,6 +210,11 @@ python exmaples/llm/main.py agents kafka [OPTIONS]
- **Description**: The name of the model to use in OpenAI.
- **Default**: `gpt-3.5-turbo-instruct`

- `--llm_orch TEXT`
- **Chioce**: `[haystack|langchain]`
- **Description**: The name of the model to use in OpenAI.
- **Default**: `langchain`

- `--help`
- **Description**: Show the help message with options and commands details.

Expand Down
43 changes: 19 additions & 24 deletions examples/llm/agents/kafka_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,11 @@
import logging
import time

from langchain.agents import AgentType
from langchain.agents import initialize_agent
from langchain.agents import load_tools
from langchain.agents.agent import AgentExecutor
from langchain.llms.openai import OpenAIChat

from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.llm import LLMEngine
from morpheus.llm.nodes.extracter_node import ExtracterNode
from morpheus.llm.nodes.haystack_agent_node import HaystackAgentNode
from morpheus.llm.nodes.langchain_agent_node import LangChainAgentNode
from morpheus.llm.task_handlers.simple_task_handler import SimpleTaskHandler
from morpheus.messages import ControlMessage
Expand All @@ -34,36 +29,36 @@
from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage

logger = logging.getLogger(__name__)


def _build_agent_executor(model_name: str) -> AgentExecutor:

llm = OpenAIChat(model=model_name, temperature=0)
from ..common.utils import build_haystack_agent
from ..common.utils import build_langchain_agent_executor

tools = load_tools(["serpapi", "llm-math"], llm=llm)

agent_executor = initialize_agent(tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True)

return agent_executor
logger = logging.getLogger(__name__)


def _build_engine(model_name: str) -> LLMEngine:
def _build_engine(model_name: str, llm_orch: str) -> LLMEngine:

engine = LLMEngine()

engine.add_node("extracter", node=ExtracterNode())

engine.add_node("agent",
inputs=[("/extracter")],
node=LangChainAgentNode(agent_executor=_build_agent_executor(model_name=model_name)))
agent_node = None

if llm_orch == "langchain":
agent_node = LangChainAgentNode(agent_executor=build_langchain_agent_executor(model_name=model_name))
elif llm_orch == "haystack":
agent_node = HaystackAgentNode(agent=build_haystack_agent(model_name=model_name))
else:
raise RuntimeError(f"LLM orchestration framework '{llm_orch}' is not supported yet.")

engine.add_node("agent", inputs=[("/extracter")], node=agent_node)

engine.add_task_handler(inputs=["/extracter"], handler=SimpleTaskHandler())
engine.add_task_handler(inputs=["/agent"], handler=SimpleTaskHandler())

return engine


def pipeline(num_threads: int, pipeline_batch_size: int, model_max_batch_size: int, model_name: str) -> float:
def pipeline(num_threads: int, pipeline_batch_size: int, model_max_batch_size: int, model_name: str,
llm_orch: str) -> float:
config = Config()
config.mode = PipelineModes.OTHER

Expand All @@ -85,7 +80,7 @@ def pipeline(num_threads: int, pipeline_batch_size: int, model_max_batch_size: i

# pipe.add_stage(MonitorStage(config, description="Source rate", unit='questions'))

pipe.add_stage(LLMEngineStage(config, engine=_build_engine(model_name=model_name)))
pipe.add_stage(LLMEngineStage(config, engine=_build_engine(model_name=model_name, llm_orch=llm_orch)))

sink = pipe.add_stage(InMemorySinkStage(config))

Expand Down
1 change: 1 addition & 0 deletions examples/llm/agents/requirements.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ dependencies:

####### Pip Dependencies (keep sorted!) #######
- pip:
- farm-haystack==1.22.1
- grpcio-status==1.58 # To keep in sync with 1.58 grpcio which is installed for Morpheus
- nemollm
12 changes: 12 additions & 0 deletions examples/llm/agents/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ def run():
type=click.IntRange(min=1),
help="Number of times to repeat the input query. Useful for testing performance.",
)
@click.option(
"--llm_orch",
default="langchain",
type=click.Choice(["haystack", "langchain"], case_sensitive=False),
help="LLM orchestration frameworks, that you can use to include in the pipeline.",
)
def simple(**kwargs):

from .simple_pipeline import pipeline as _pipeline
Expand Down Expand Up @@ -91,6 +97,12 @@ def simple(**kwargs):
default='gpt-3.5-turbo-instruct',
help="The name of the model to use in OpenAI",
)
@click.option(
"--llm_orch",
default="langchain",
type=click.Choice(["haystack", "langchain"], case_sensitive=False),
help="LLM orchestration frameworks, that you can use to include in the pipeline.",
)
def kafka(**kwargs):

from .kafka_pipeline import pipeline as _pipeline
Expand Down
59 changes: 28 additions & 31 deletions examples/llm/agents/simple_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,13 @@
import logging
import time

from langchain import OpenAI
from langchain.agents import AgentType
from langchain.agents import initialize_agent
from langchain.agents import load_tools
from langchain.agents.agent import AgentExecutor

import cudf

from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.llm import LLMEngine
from morpheus.llm.nodes.extracter_node import ExtracterNode
from morpheus.llm.nodes.haystack_agent_node import HaystackAgentNode
from morpheus.llm.nodes.langchain_agent_node import LangChainAgentNode
from morpheus.llm.task_handlers.simple_task_handler import SimpleTaskHandler
from morpheus.messages import ControlMessage
Expand All @@ -38,42 +33,40 @@
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage
from morpheus.utils.concat_df import concat_dataframes

logger = logging.getLogger(__name__)


def _build_agent_executor(model_name: str) -> AgentExecutor:

llm = OpenAI(model=model_name, temperature=0)
from ..common.utils import build_haystack_agent
from ..common.utils import build_langchain_agent_executor

tools = load_tools(["serpapi", "llm-math"], llm=llm)

agent_executor = initialize_agent(tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True)

return agent_executor
logger = logging.getLogger(__name__)


def _build_engine(model_name: str) -> LLMEngine:
def _build_engine(model_name: str, llm_orch: str) -> LLMEngine:

engine = LLMEngine()

engine.add_node("extracter", node=ExtracterNode())

engine.add_node("agent",
inputs=[("/extracter")],
node=LangChainAgentNode(agent_executor=_build_agent_executor(model_name=model_name)))
agent_node = None

if llm_orch == "langchain":
agent_node = LangChainAgentNode(agent_executor=build_langchain_agent_executor(model_name=model_name))
elif llm_orch == "haystack":
agent_node = HaystackAgentNode(agent=build_haystack_agent(model_name=model_name))
else:
raise RuntimeError(f"LLM orchestration framework '{llm_orch}' is not supported yet.")

engine.add_node("agent", inputs=[("/extracter")], node=agent_node)

engine.add_task_handler(inputs=["/agent"], handler=SimpleTaskHandler())

return engine


def pipeline(
num_threads: int,
pipeline_batch_size,
model_max_batch_size,
model_name,
repeat_count,
) -> float:
def pipeline(num_threads: int,
pipeline_batch_size: int,
model_max_batch_size: int,
model_name: str,
repeat_count: int,
llm_orch: str) -> float:
config = Config()
config.mode = PipelineModes.OTHER

Expand All @@ -85,8 +78,12 @@ def pipeline(
config.edge_buffer_size = 128

source_dfs = [
cudf.DataFrame(
{"questions": ["Who is Leo DiCaprio's girlfriend? What is her current age raised to the 0.43 power?"]})
cudf.DataFrame({
"questions": [
"Who is Leo DiCaprio's girlfriend? What is her current age raised to the 0.43 power?",
"Who is the 7th president of United States?"
]
})
]

completion_task = {"task_type": "completion", "task_dict": {"input_keys": ["questions"], }}
Expand All @@ -100,7 +97,7 @@ def pipeline(

pipe.add_stage(MonitorStage(config, description="Source rate", unit='questions'))

pipe.add_stage(LLMEngineStage(config, engine=_build_engine(model_name=model_name)))
pipe.add_stage(LLMEngineStage(config, engine=_build_engine(model_name=model_name, llm_orch=llm_orch)))

sink = pipe.add_stage(InMemorySinkStage(config))

Expand Down
Loading
Loading