-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmcp_client.py
271 lines (230 loc) · 9.69 KB
/
mcp_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
import asyncio
import json
import os
from typing import Any, List
from dotenv import load_dotenv
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from openai import AsyncOpenAI
load_dotenv()
MODEL = os.getenv("LLM_MODEL")
BASE_URL = os.getenv("OPENAI_API_BASE")
API_KEY = os.getenv("OPENAI_API_KEY")
# System prompt that guides the LLM's behavior and capabilities
# This helps the model understand its role and available tools
SYSTEM_PROMPT = """You are a helpful assistant capable of accessing external functions and engaging in casual chat. Use the responses from these function calls to provide accurate and informative answers. The answers should be natural and hide the fact that you are using tools to access real-time information. Guide the user about available tools and their capabilities. Always utilize tools to access real-time information when required. Engage in a friendly manner to enhance the chat experience.
# Tools
{tools}
# Notes
- Ensure responses are based on the latest information available from function calls.
- Maintain an engaging, supportive, and friendly tone throughout the dialogue.
- Always highlight the potential of available tools to assist users comprehensively."""
# Initialize OpenAI client with HuggingFace inference API
# This allows us to use Llama models through HuggingFace's API
client = AsyncOpenAI(
base_url=BASE_URL,
api_key=API_KEY,
)
class MCPClient:
"""
A client class for interacting with the MCP (Model Control Protocol) server.
This class manages the connection and communication with the SQLite database through MCP.
"""
def __init__(self, server_params: StdioServerParameters):
"""Initialize the MCP client with server parameters"""
self.server_params = server_params
self.session = None
self._client = None
async def __aenter__(self):
"""Async context manager entry"""
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit"""
if self.session:
await self.session.__aexit__(exc_type, exc_val, exc_tb)
if self._client:
await self._client.__aexit__(exc_type, exc_val, exc_tb)
async def connect(self):
"""Establishes connection to MCP server"""
self._client = stdio_client(self.server_params)
self.read, self.write = await self._client.__aenter__()
session = ClientSession(self.read, self.write)
self.session = await session.__aenter__()
await self.session.initialize()
async def get_available_tools(self) -> List[Any]:
"""
Retrieve a list of available tools from the MCP server.
"""
if not self.session:
raise RuntimeError("Not connected to MCP server")
tools = await self.session.list_tools()
_, tools_list = tools
_, tools_list = tools_list
return tools_list
def call_tool(self, tool_name: str) -> Any:
"""
Create a callable function for a specific tool.
This allows us to execute database operations through the MCP server.
Args:
tool_name: The name of the tool to create a callable for
Returns:
A callable async function that executes the specified tool
"""
if not self.session:
raise RuntimeError("Not connected to MCP server")
async def callable(*args, **kwargs):
response = await self.session.call_tool(tool_name, arguments=kwargs)
return response.content[0].text
return callable
async def agent_loop(query: str, tools: dict, messages: List[dict] = None):
"""
Main interaction loop that processes user queries using the LLM and available tools.
This function:
1. Sends the user query to the LLM with context about available tools
2. Processes the LLM's response, including any tool calls
3. Returns the final response to the user
Args:
query: User's input question or command
tools: Dictionary of available database tools and their schemas
messages: List of messages to pass to the LLM, defaults to None
"""
messages = (
[
{
"role": "system",
"content": SYSTEM_PROMPT.format(
tools="\n- ".join(
[
f"{t['name']}: {t['schema']['function']['description']}"
for t in tools.values()
]
)
), # Creates System prompt based on available MCP server tools
},
]
if messages is None
else messages # reuse existing messages if provided
)
# add user query to the messages list
messages.append({"role": "user", "content": query})
# Query LLM with the system prompt, user query, and available tools
first_response = await client.chat.completions.create(
model=MODEL,
messages=messages,
tools=([t["schema"] for t in tools.values()] if len(tools) > 0 else None),
max_tokens=4096,
temperature=0,
)
# detect how the LLM call was completed:
# tool_calls: if the LLM used a tool
# stop: If the LLM generated a general response, e.g. "Hello, how can I help you today?"
stop_reason = None
try:
stop_reason = (
"tool_calls"
if first_response.choices[0].message.tool_calls is not None
else first_response.choices[0].finish_reason
)
except Exception as e:
print(f">>> Exception: {e}")
print(first_response)
if stop_reason == "tool_calls":
# Extract tool use details from response
for tool_call in first_response.choices[0].message.tool_calls:
arguments = (
json.loads(tool_call.function.arguments)
if isinstance(tool_call.function.arguments, str)
else tool_call.function.arguments
)
# Call the tool with the arguments using our callable initialized in the tools dict
tool_result = await tools[tool_call.function.name]["callable"](**arguments)
# Add the tool result to the messages list
messages.append(first_response.choices[0].message)
messages.append(
{
"role": "tool",
"tool_call_id": tool_call.id,
"name": tool_call.function.name,
"content": json.dumps(tool_result),
}
)
# Query LLM with the user query and the tool results
new_response = await client.chat.completions.create(
model=MODEL,
messages=messages,
)
elif stop_reason == "stop":
# If the LLM stopped on its own, use the first response
new_response = first_response
else:
raise ValueError(f"Unknown stop reason: {stop_reason}")
# Add the LLM response to the messages list
messages.append(
{"role": "assistant", "content": new_response.choices[0].message.content}
)
# Return the LLM response and messages
return new_response.choices[0].message.content, messages
async def main():
"""
Main function that sets up the MCP server, initializes tools, and runs the interactive loop.
The server is run in a Docker container to ensure isolation and consistency.
"""
# Configure Docker-based MCP server for SQLite
server_params = StdioServerParameters(
command="docker",
args=[
"run",
"--rm", # Remove container after exit
"-i", # Interactive mode
"-v", # Mount volume
"mcp-test:/mcp", # Map local volume to container path
"mcp/sqlite", # Use SQLite MCP image
"--db-path",
"/mcp/test.db", # Database file path inside container
],
env=None,
)
# Start MCP client and create interactive session
async with MCPClient(server_params) as mcp_client:
# Get available database tools and prepare them for the LLM
mcp_tools = await mcp_client.get_available_tools()
# Convert MCP tools into a format the LLM can understand and use
tools = {
tool.name: {
"name": tool.name,
"callable": mcp_client.call_tool(
tool.name
), # returns a callable function for the rpc call
"schema": {
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema,
},
},
}
for tool in mcp_tools
if tool.name
!= "list_tables" # Excludes list_tables tool as it has an incorrect schema
}
# Start interactive prompt loop for user queries
messages = None
while True:
try:
# Get user input and check for exit commands
user_input = input("\nEnter your prompt (or 'quit' to exit): ")
if user_input.lower() in ["quit", "exit", "q"]:
break
# Process the prompt and run agent loop
response, messages = await agent_loop(user_input, tools, messages)
print("\nResponse:", response)
# print("\nMessages:", messages)
except KeyboardInterrupt:
print("\nExiting...")
break
except Exception as e:
print(f"\nError occurred: {e}")
if __name__ == "__main__":
asyncio.run(main())