implement openmanus client based on mcp

This commit is contained in:
gantnocap 2025-03-13 18:52:55 +08:00
parent 792dc664a7
commit 7baab6ad95
5 changed files with 232 additions and 196 deletions

View File

@ -111,10 +111,12 @@ After clicking on the hammer icon, you should see tools listed:
### 💻 2. Testing with simple Client Example
Check out `openmanus_client_example.py` to test the openmanus server using the MCP client.
Check out `openmanus_client.py` to test the openmanus server using the MCP client.
#### Demo display
https://github.com/user-attachments/assets/aeacd93d-9bec-46d1-831b-20e898c7507b
```
uv run openmanus_server/openmanus_client_example.py openmanus_server/openmanus_server.py
python openmanus_server/openmanus_client.py
```

View File

@ -1,3 +1,4 @@
# Core dependencies
mcp
httpx>=0.27.0
tomli>=2.0.0

View File

@ -0,0 +1,224 @@
import ast
import asyncio
import os
import sys
from contextlib import AsyncExitStack
from pathlib import Path
from typing import Optional
import tomli
from colorama import Fore, init
from dotenv import load_dotenv
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from openai import AsyncOpenAI
# Initialize colorama
def init_colorama():
init(autoreset=True)
# Load config
def load_config():
config_path = Path(__file__).parent.parent / "config" / "config.toml"
try:
with open(config_path, "rb") as f:
return tomli.load(f)
except FileNotFoundError:
print(f"Error: config.toml not found at {config_path}")
sys.exit(1)
except tomli.TOMLDecodeError as e:
print(f"Error: Invalid TOML in config.toml: {e}")
sys.exit(1)
# Load environment variables (as fallback)
load_dotenv()
class OpenManusClient:
def __init__(self):
# Load configuration
self.config = load_config()
# Initialize session and client objects
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
# Initialize AsyncOpenAI client with config
api_key = self.config["llm"]["api_key"] or os.getenv("OPENAI_API_KEY")
if not api_key:
raise ValueError(
"OpenAI API key not found in config.toml or environment variables"
)
self.openai_client = AsyncOpenAI(
api_key=api_key, base_url=self.config["llm"]["base_url"]
)
async def connect_to_server(self, server_script_path: str = None):
"""Connect to the openmanus MCP server"""
# Use provided path or default from config
script_path = server_script_path or self.config["server"]["default_script"]
server_params = StdioServerParameters(
command="python", args=[script_path], env=None
)
stdio_transport = await self.exit_stack.enter_async_context(
stdio_client(server_params)
)
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(
ClientSession(self.stdio, self.write)
)
await self.session.initialize()
# List available tools
response = await self.session.list_tools()
tools = response.tools
print("\nConnected to server with tools:", [tool.name for tool in tools])
async def chat_loop(self):
"""Run an interactive chat loop for testing tools"""
print(Fore.CYAN + "\n🚀 OpenManus MCP Client Started!")
print(Fore.GREEN + "Type your queries or 'quit' to exit.")
print(
Fore.YELLOW
+ "Example query: 'What is the recent news about the stock market?'\n"
)
while True:
try:
query = input(Fore.BLUE + "🔍 Query: ").strip()
if query.lower() == "quit":
print(Fore.RED + "👋 Exiting... Goodbye!")
break
response = await self.process_query(query)
print(Fore.MAGENTA + "\n💬 Response: " + response)
except Exception as e:
print(Fore.RED + f"\n❌ Error: {str(e)}")
async def cleanup(self):
"""Clean up resources"""
await self.exit_stack.aclose()
await self.openai_client.close() # Close the OpenAI client
async def process_query(self, query: str) -> str:
"""Process a query using LLM and available tools"""
# Add a system message to set the context for the model
messages = [
{
"role": "system",
"content": "You are a general-purpose AI assistant called OpenManus. You can help users complete a wide range of tasks, providing detailed information and assistance as needed. Please include emojis in your responses to make them more engaging.",
},
{"role": "user", "content": query},
]
response = await self.session.list_tools()
available_tools = [
{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema,
},
}
for tool in response.tools
]
# Initial LLM API call
response = await self.openai_client.chat.completions.create(
model=self.config["llm"]["model"],
messages=messages,
tools=available_tools,
tool_choice="auto",
)
# Process response and handle tool calls
final_text = []
while True:
message = response.choices[0].message
# Add assistant's message to conversation
messages.append(
{
"role": "assistant",
"content": message.content if message.content else None,
"tool_calls": message.tool_calls
if hasattr(message, "tool_calls")
else None,
}
)
# If no tool calls, we're done
if not hasattr(message, "tool_calls") or not message.tool_calls:
if message.content:
final_text.append(message.content)
break
# Handle tool calls
for tool_call in message.tool_calls:
tool_name = tool_call.function.name
tool_args = tool_call.function.arguments
# Convert tool_args from string to dictionary if necessary
if isinstance(tool_args, str):
try:
tool_args = ast.literal_eval(tool_args)
except (ValueError, SyntaxError) as e:
print(f"Error converting tool_args to dict: {e}")
tool_args = {}
# Ensure tool_args is a dictionary
if not isinstance(tool_args, dict):
tool_args = {}
# Execute tool call
print(f"Calling tool {tool_name} with args: {tool_args}")
result = await self.session.call_tool(tool_name, tool_args)
final_text.append(f"[Calling tool {tool_name}]")
# final_text.append(f"Result: {result.content}")
# Add tool result to messages
messages.append(
{
"role": "tool",
"tool_call_id": tool_call.id,
"content": str(result.content),
}
)
# Get next response from LLM
response = await self.openai_client.chat.completions.create(
model=self.config["llm"]["model"],
messages=messages,
tools=available_tools,
tool_choice="auto",
)
return "\n".join(final_text)
async def main():
if len(sys.argv) > 1:
server_script = sys.argv[1]
else:
server_script = "./openmanus_server/openmanus_server.py"
client = OpenManusClient()
try:
await client.connect_to_server(server_script)
await client.chat_loop()
finally:
await client.cleanup()
if __name__ == "__main__":
asyncio.run(main())

View File

@ -1,181 +0,0 @@
import asyncio
import os
from contextlib import AsyncExitStack
from typing import Optional
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
class OpenManusClient:
def __init__(self):
# Initialize session and client objects
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
self.stdio = None
self.write = None
async def connect_to_server(self, server_script_path: str):
"""Connect to an MCP server via stdio
Args:
server_script_path: Path to the server script
"""
if not server_script_path.endswith(".py"):
raise ValueError("Server script must be a .py file")
# Get the current directory to add to PYTHONPATH
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(current_dir) # Get parent directory
# Prepare environment variables
env = os.environ.copy() # Copy current environment
# Add current directory and project root to PYTHONPATH
path_separator = (
";" if os.name == "nt" else ":"
) # Use ; for Windows, : for Unix
if "PYTHONPATH" in env:
env[
"PYTHONPATH"
] = f"{current_dir}{path_separator}{project_root}{path_separator}{env['PYTHONPATH']}"
else:
env["PYTHONPATH"] = f"{current_dir}{path_separator}{project_root}"
server_params = StdioServerParameters(
command="python", args=[server_script_path], env=env
)
stdio_transport = await self.exit_stack.enter_async_context(
stdio_client(server_params)
)
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(
ClientSession(self.stdio, self.write)
)
await self.session.initialize()
# List available tools
response = await self.session.list_tools()
tools = response.tools
print("\nConnected to server with tools:", [tool.name for tool in tools])
return tools
async def run_examples(self):
"""Run example tool calls to demonstrate functionality"""
try:
print("\nExample 1: Google Search")
search_result = await self.session.call_tool(
"google_search", {"query": "Model Context Protocol", "num_results": 5}
)
print(f"Search results: {search_result.content}")
print("\nExample 2: Python Code Execution")
code = """
import math
result = 0
for i in range(1, 10):
result += math.sqrt(i)
print(f"Calculation result: {result}")
"""
python_result = await self.session.call_tool(
"python_execute", {"code": code, "timeout": 3}
)
print(f"Python execution result: {python_result.content}")
print("\nExample 3: File Saving")
file_result = await self.session.call_tool(
"file_saver",
{
"content": "This is a test file content saved through MCP",
"file_path": "mcp_test_file.txt",
},
)
print(f"File save result: {file_result.content}")
print("\nExample 4: Browser Usage")
# Navigate to webpage
browser_result = await self.session.call_tool(
"browser_use", {"action": "navigate", "url": "https://www.example.com"}
)
print(f"Browser navigation result: {browser_result.content}")
# Get browser state
state_result = await self.session.call_tool("get_browser_state", {})
print(f"Browser state: {state_result.content}")
except Exception as e:
print(f"\nError during example execution: {str(e)}")
async def chat_loop(self):
"""Run an interactive chat loop for testing tools"""
print("\nOpenManus MCP Client Started!")
print("Type your commands or 'quit' to exit.")
print(
"Available commands: google_search, python_execute, file_saver, browser_use, get_browser_state"
)
while True:
try:
command = input("\nCommand: ").strip()
if command.lower() == "quit":
break
# Parse command and parameters
parts = command.split(maxsplit=1)
if len(parts) == 0:
continue
tool_name = parts[0]
tool_args = {}
if len(parts) > 1:
try:
tool_args = eval(parts[1]) # Convert string to dict
except:
print(
"Invalid arguments format. Please provide a valid Python dictionary."
)
continue
result = await self.session.call_tool(tool_name, tool_args)
print("\nResult:", result.content)
except Exception as e:
print(f"\nError: {str(e)}")
async def cleanup(self):
"""Clean up resources"""
if self.session:
await self.session.close()
await self.exit_stack.aclose()
print("\nClosed MCP client connection")
async def main():
"""Main entry point"""
import sys
if len(sys.argv) < 2:
print("Usage: python openmanus_client_example.py <path_to_server_script>")
print("Example: python openmanus_client_example.py ../mcp_server.py")
sys.exit(1)
client = OpenManusClient()
try:
await client.connect_to_server(server_script_path=sys.argv[1])
# Run examples first
await client.run_examples()
# Then start interactive chat loop
await client.chat_loop()
except Exception as e:
print(f"Error: {str(e)}")
finally:
await client.cleanup()
if __name__ == "__main__":
asyncio.run(main())

View File

@ -172,25 +172,15 @@ def parse_args():
parser = argparse.ArgumentParser(description="OpenManus MCP Server")
parser.add_argument(
"--transport",
choices=["stdio", "http"],
choices=["stdio"],
default="stdio",
help="Communication method: stdio or http (default: stdio)",
)
parser.add_argument(
"--host", default="127.0.0.1", help="HTTP server host (default: 127.0.0.1)"
)
parser.add_argument(
"--port", type=int, default=8000, help="HTTP server port (default: 8000)"
)
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
if args.transport == "stdio":
logger.info("Starting OpenManus server (stdio mode)")
openmanus.run(transport="stdio")
else:
logger.info(f"Starting OpenManus server (HTTP mode) at {args.host}:{args.port}")
openmanus.run(transport="http", host=args.host, port=args.port)