Compare commits

..

No commits in common. "main" and "flow-auto-validate" have entirely different histories.

11 changed files with 197 additions and 355 deletions

View File

@ -1,18 +1,15 @@
from typing import Dict, List, Literal, Optional, Union
import boto3
import json import json
import sys
import time import time
import uuid import uuid
from datetime import datetime from datetime import datetime
from typing import Dict, List, Literal, Optional import sys
import boto3
# Global variables to track the current tool use ID across function calls # Global variables to track the current tool use ID across function calls
# Tmp solution # Tmp solution
CURRENT_TOOLUSE_ID = None CURRENT_TOOLUSE_ID = None
# Class to handle OpenAI-style response formatting # Class to handle OpenAI-style response formatting
class OpenAIResponse: class OpenAIResponse:
def __init__(self, data): def __init__(self, data):
@ -21,37 +18,31 @@ class OpenAIResponse:
if isinstance(value, dict): if isinstance(value, dict):
value = OpenAIResponse(value) value = OpenAIResponse(value)
elif isinstance(value, list): elif isinstance(value, list):
value = [ value = [OpenAIResponse(item) if isinstance(item, dict) else item for item in value]
OpenAIResponse(item) if isinstance(item, dict) else item
for item in value
]
setattr(self, key, value) setattr(self, key, value)
def model_dump(self, *args, **kwargs): def model_dump(self, *args, **kwargs):
# Convert object to dict and add timestamp # Convert object to dict and add timestamp
data = self.__dict__ data = self.__dict__
data["created_at"] = datetime.now().isoformat() data['created_at'] = datetime.now().isoformat()
return data return data
# Main client class for interacting with Amazon Bedrock # Main client class for interacting with Amazon Bedrock
class BedrockClient: class BedrockClient:
def __init__(self): def __init__(self):
# Initialize Bedrock client, you need to configure AWS env first # Initialize Bedrock client, you need to configure AWS env first
try: try:
self.client = boto3.client("bedrock-runtime") self.client = boto3.client('bedrock-runtime')
self.chat = Chat(self.client) self.chat = Chat(self.client)
except Exception as e: except Exception as e:
print(f"Error initializing Bedrock client: {e}") print(f"Error initializing Bedrock client: {e}")
sys.exit(1) sys.exit(1)
# Chat interface class # Chat interface class
class Chat: class Chat:
def __init__(self, client): def __init__(self, client):
self.completions = ChatCompletions(client) self.completions = ChatCompletions(client)
# Core class handling chat completions functionality # Core class handling chat completions functionality
class ChatCompletions: class ChatCompletions:
def __init__(self, client): def __init__(self, client):
@ -61,23 +52,19 @@ class ChatCompletions:
# Convert OpenAI function calling format to Bedrock tool format # Convert OpenAI function calling format to Bedrock tool format
bedrock_tools = [] bedrock_tools = []
for tool in tools: for tool in tools:
if tool.get("type") == "function": if tool.get('type') == 'function':
function = tool.get("function", {}) function = tool.get('function', {})
bedrock_tool = { bedrock_tool = {
"toolSpec": { "toolSpec": {
"name": function.get("name", ""), "name": function.get('name', ''),
"description": function.get("description", ""), "description": function.get('description', ''),
"inputSchema": { "inputSchema": {
"json": { "json": {
"type": "object", "type": "object",
"properties": function.get("parameters", {}).get( "properties": function.get('parameters', {}).get('properties', {}),
"properties", {} "required": function.get('parameters', {}).get('required', [])
), }
"required": function.get("parameters", {}).get(
"required", []
),
} }
},
} }
} }
bedrock_tools.append(bedrock_tool) bedrock_tools.append(bedrock_tool)
@ -88,43 +75,41 @@ class ChatCompletions:
bedrock_messages = [] bedrock_messages = []
system_prompt = [] system_prompt = []
for message in messages: for message in messages:
if message.get("role") == "system": if message.get('role') == 'system':
system_prompt = [{"text": message.get("content")}] system_prompt = [{"text": message.get('content')}]
elif message.get("role") == "user": elif message.get('role') == 'user':
bedrock_message = { bedrock_message = {
"role": message.get("role", "user"), "role": message.get('role', 'user'),
"content": [{"text": message.get("content")}], "content": [{"text": message.get('content')}]
} }
bedrock_messages.append(bedrock_message) bedrock_messages.append(bedrock_message)
elif message.get("role") == "assistant": elif message.get('role') == 'assistant':
bedrock_message = { bedrock_message = {
"role": "assistant", "role": "assistant",
"content": [{"text": message.get("content")}], "content": [{"text": message.get('content')}]
} }
openai_tool_calls = message.get("tool_calls", []) openai_tool_calls = message.get('tool_calls', [])
if openai_tool_calls: if openai_tool_calls:
bedrock_tool_use = { bedrock_tool_use = {
"toolUseId": openai_tool_calls[0]["id"], "toolUseId": openai_tool_calls[0]['id'],
"name": openai_tool_calls[0]["function"]["name"], "name": openai_tool_calls[0]['function']['name'],
"input": json.loads( "input": json.loads(openai_tool_calls[0]['function']['arguments'])
openai_tool_calls[0]["function"]["arguments"]
),
} }
bedrock_message["content"].append({"toolUse": bedrock_tool_use}) bedrock_message['content'].append({"toolUse": bedrock_tool_use})
global CURRENT_TOOLUSE_ID global CURRENT_TOOLUSE_ID
CURRENT_TOOLUSE_ID = openai_tool_calls[0]["id"] CURRENT_TOOLUSE_ID = openai_tool_calls[0]['id']
bedrock_messages.append(bedrock_message) bedrock_messages.append(bedrock_message)
elif message.get("role") == "tool": elif message.get('role') == 'tool':
bedrock_message = { bedrock_message = {
"role": "user", "role": "user",
"content": [ "content": [
{ {
"toolResult": { "toolResult": {
"toolUseId": CURRENT_TOOLUSE_ID, "toolUseId": CURRENT_TOOLUSE_ID,
"content": [{"text": message.get("content")}], "content": [{"text":message.get('content')}]
} }
} }
], ]
} }
bedrock_messages.append(bedrock_message) bedrock_messages.append(bedrock_message)
else: else:
@ -134,27 +119,26 @@ class ChatCompletions:
def _convert_bedrock_response_to_openai_format(self, bedrock_response): def _convert_bedrock_response_to_openai_format(self, bedrock_response):
# Convert Bedrock response format to OpenAI format # Convert Bedrock response format to OpenAI format
content = "" content = ""
if bedrock_response.get("output", {}).get("message", {}).get("content"): if bedrock_response.get('output', {}).get('message', {}).get('content'):
content_array = bedrock_response["output"]["message"]["content"] content_array = bedrock_response['output']['message']['content']
content = "".join(item.get("text", "") for item in content_array) content = "".join(item.get('text', '') for item in content_array)
if content == "": if content == "": content = "."
content = "."
# Handle tool calls in response # Handle tool calls in response
openai_tool_calls = [] openai_tool_calls = []
if bedrock_response.get("output", {}).get("message", {}).get("content"): if bedrock_response.get('output', {}).get('message', {}).get('content'):
for content_item in bedrock_response["output"]["message"]["content"]: for content_item in bedrock_response['output']['message']['content']:
if content_item.get("toolUse"): if content_item.get('toolUse'):
bedrock_tool_use = content_item["toolUse"] bedrock_tool_use = content_item['toolUse']
global CURRENT_TOOLUSE_ID global CURRENT_TOOLUSE_ID
CURRENT_TOOLUSE_ID = bedrock_tool_use["toolUseId"] CURRENT_TOOLUSE_ID = bedrock_tool_use['toolUseId']
openai_tool_call = { openai_tool_call = {
"id": CURRENT_TOOLUSE_ID, 'id': CURRENT_TOOLUSE_ID,
"type": "function", 'type': 'function',
"function": { 'function': {
"name": bedrock_tool_use["name"], 'name': bedrock_tool_use['name'],
"arguments": json.dumps(bedrock_tool_use["input"]), 'arguments': json.dumps(bedrock_tool_use['input'])
}, }
} }
openai_tool_calls.append(openai_tool_call) openai_tool_calls.append(openai_tool_call)
@ -166,29 +150,21 @@ class ChatCompletions:
"system_fingerprint": None, "system_fingerprint": None,
"choices": [ "choices": [
{ {
"finish_reason": bedrock_response.get("stopReason", "end_turn"), "finish_reason": bedrock_response.get('stopReason', 'end_turn'),
"index": 0, "index": 0,
"message": { "message": {
"content": content, "content": content,
"role": bedrock_response.get("output", {}) "role": bedrock_response.get('output', {}).get('message', {}).get('role', 'assistant'),
.get("message", {}) "tool_calls": openai_tool_calls if openai_tool_calls != [] else None,
.get("role", "assistant"), "function_call": None
"tool_calls": openai_tool_calls }
if openai_tool_calls != []
else None,
"function_call": None,
},
} }
], ],
"usage": { "usage": {
"completion_tokens": bedrock_response.get("usage", {}).get( "completion_tokens": bedrock_response.get('usage', {}).get('outputTokens', 0),
"outputTokens", 0 "prompt_tokens": bedrock_response.get('usage', {}).get('inputTokens', 0),
), "total_tokens": bedrock_response.get('usage', {}).get('totalTokens', 0)
"prompt_tokens": bedrock_response.get("usage", {}).get( }
"inputTokens", 0
),
"total_tokens": bedrock_response.get("usage", {}).get("totalTokens", 0),
},
} }
return OpenAIResponse(openai_format) return OpenAIResponse(openai_format)
@ -200,19 +176,16 @@ class ChatCompletions:
temperature: float, temperature: float,
tools: Optional[List[dict]] = None, tools: Optional[List[dict]] = None,
tool_choice: Literal["none", "auto", "required"] = "auto", tool_choice: Literal["none", "auto", "required"] = "auto",
**kwargs, **kwargs
) -> OpenAIResponse: ) -> OpenAIResponse:
# Non-streaming invocation of Bedrock model # Non-streaming invocation of Bedrock model
( system_prompt, bedrock_messages = self._convert_openai_messages_to_bedrock_format(messages)
system_prompt,
bedrock_messages,
) = self._convert_openai_messages_to_bedrock_format(messages)
response = self.client.converse( response = self.client.converse(
modelId=model, modelId = model,
system=system_prompt, system = system_prompt,
messages=bedrock_messages, messages = bedrock_messages,
inferenceConfig={"temperature": temperature, "maxTokens": max_tokens}, inferenceConfig = {"temperature": temperature, "maxTokens": max_tokens},
toolConfig={"tools": tools} if tools else None, toolConfig = {"tools": tools} if tools else None,
) )
openai_response = self._convert_bedrock_response_to_openai_format(response) openai_response = self._convert_bedrock_response_to_openai_format(response)
return openai_response return openai_response
@ -225,76 +198,60 @@ class ChatCompletions:
temperature: float, temperature: float,
tools: Optional[List[dict]] = None, tools: Optional[List[dict]] = None,
tool_choice: Literal["none", "auto", "required"] = "auto", tool_choice: Literal["none", "auto", "required"] = "auto",
**kwargs, **kwargs
) -> OpenAIResponse: ) -> OpenAIResponse:
# Streaming invocation of Bedrock model # Streaming invocation of Bedrock model
( system_prompt, bedrock_messages = self._convert_openai_messages_to_bedrock_format(messages)
system_prompt,
bedrock_messages,
) = self._convert_openai_messages_to_bedrock_format(messages)
response = self.client.converse_stream( response = self.client.converse_stream(
modelId=model, modelId = model,
system=system_prompt, system = system_prompt,
messages=bedrock_messages, messages = bedrock_messages,
inferenceConfig={"temperature": temperature, "maxTokens": max_tokens}, inferenceConfig = {"temperature": temperature, "maxTokens": max_tokens},
toolConfig={"tools": tools} if tools else None, toolConfig = {"tools": tools} if tools else None,
) )
# Initialize response structure # Initialize response structure
bedrock_response = { bedrock_response = {
"output": {"message": {"role": "", "content": []}}, 'output': {
"stopReason": "", 'message': {
"usage": {}, 'role': '',
"metrics": {}, 'content': []
}
},
'stopReason': '',
'usage': {},
'metrics': {}
} }
bedrock_response_text = "" bedrock_response_text = ""
bedrock_response_tool_input = "" bedrock_response_tool_input = ""
# Process streaming response # Process streaming response
stream = response.get("stream") stream = response.get('stream')
if stream: if stream:
for event in stream: for event in stream:
if event.get("messageStart", {}).get("role"): if event.get('messageStart', {}).get('role'):
bedrock_response["output"]["message"]["role"] = event[ bedrock_response['output']['message']['role'] = event['messageStart']['role']
"messageStart" if event.get('contentBlockDelta', {}).get('delta', {}).get('text'):
]["role"] bedrock_response_text += event['contentBlockDelta']['delta']['text']
if event.get("contentBlockDelta", {}).get("delta", {}).get("text"): print(event['contentBlockDelta']['delta']['text'], end='', flush=True)
bedrock_response_text += event["contentBlockDelta"]["delta"]["text"] if event.get('contentBlockStop', {}).get('contentBlockIndex') == 0:
print( bedrock_response['output']['message']['content'].append({"text": bedrock_response_text})
event["contentBlockDelta"]["delta"]["text"], end="", flush=True if event.get('contentBlockStart', {}).get('start', {}).get('toolUse'):
) bedrock_tool_use = event['contentBlockStart']['start']['toolUse']
if event.get("contentBlockStop", {}).get("contentBlockIndex") == 0:
bedrock_response["output"]["message"]["content"].append(
{"text": bedrock_response_text}
)
if event.get("contentBlockStart", {}).get("start", {}).get("toolUse"):
bedrock_tool_use = event["contentBlockStart"]["start"]["toolUse"]
tool_use = { tool_use = {
"toolUseId": bedrock_tool_use["toolUseId"], "toolUseId": bedrock_tool_use['toolUseId'],
"name": bedrock_tool_use["name"], "name": bedrock_tool_use['name'],
} }
bedrock_response["output"]["message"]["content"].append( bedrock_response['output']['message']['content'].append({"toolUse": tool_use})
{"toolUse": tool_use}
)
global CURRENT_TOOLUSE_ID global CURRENT_TOOLUSE_ID
CURRENT_TOOLUSE_ID = bedrock_tool_use["toolUseId"] CURRENT_TOOLUSE_ID = bedrock_tool_use['toolUseId']
if event.get("contentBlockDelta", {}).get("delta", {}).get("toolUse"): if event.get('contentBlockDelta', {}).get('delta', {}).get('toolUse'):
bedrock_response_tool_input += event["contentBlockDelta"]["delta"][ bedrock_response_tool_input += event['contentBlockDelta']['delta']['toolUse']['input']
"toolUse" print(event['contentBlockDelta']['delta']['toolUse']['input'], end='', flush=True)
]["input"] if event.get('contentBlockStop', {}).get('contentBlockIndex') == 1:
print( bedrock_response['output']['message']['content'][1]['toolUse']['input'] = json.loads(bedrock_response_tool_input)
event["contentBlockDelta"]["delta"]["toolUse"]["input"],
end="",
flush=True,
)
if event.get("contentBlockStop", {}).get("contentBlockIndex") == 1:
bedrock_response["output"]["message"]["content"][1]["toolUse"][
"input"
] = json.loads(bedrock_response_tool_input)
print() print()
openai_response = self._convert_bedrock_response_to_openai_format( openai_response = self._convert_bedrock_response_to_openai_format(bedrock_response)
bedrock_response
)
return openai_response return openai_response
def create( def create(
@ -306,29 +263,13 @@ class ChatCompletions:
stream: Optional[bool] = True, stream: Optional[bool] = True,
tools: Optional[List[dict]] = None, tools: Optional[List[dict]] = None,
tool_choice: Literal["none", "auto", "required"] = "auto", tool_choice: Literal["none", "auto", "required"] = "auto",
**kwargs, **kwargs
) -> OpenAIResponse: ) -> OpenAIResponse:
# Main entry point for chat completion # Main entry point for chat completion
bedrock_tools = [] bedrock_tools = []
if tools is not None: if tools is not None:
bedrock_tools = self._convert_openai_tools_to_bedrock_format(tools) bedrock_tools = self._convert_openai_tools_to_bedrock_format(tools)
if stream: if stream:
return self._invoke_bedrock_stream( return self._invoke_bedrock_stream(model, messages, max_tokens, temperature, bedrock_tools, tool_choice, **kwargs)
model,
messages,
max_tokens,
temperature,
bedrock_tools,
tool_choice,
**kwargs,
)
else: else:
return self._invoke_bedrock( return self._invoke_bedrock(model, messages, max_tokens, temperature, bedrock_tools, tool_choice, **kwargs)
model,
messages,
max_tokens,
temperature,
bedrock_tools,
tool_choice,
**kwargs,
)

View File

@ -37,18 +37,6 @@ class ProxySettings(BaseModel):
class SearchSettings(BaseModel): class SearchSettings(BaseModel):
engine: str = Field(default="Google", description="Search engine the llm to use") engine: str = Field(default="Google", description="Search engine the llm to use")
fallback_engines: List[str] = Field(
default_factory=lambda: ["DuckDuckGo", "Baidu"],
description="Fallback search engines to try if the primary engine fails",
)
retry_delay: int = Field(
default=60,
description="Seconds to wait before retrying all engines again after they all fail",
)
max_retries: int = Field(
default=3,
description="Maximum number of times to retry all engines when all fail",
)
class BrowserSettings(BaseModel): class BrowserSettings(BaseModel):

View File

@ -1,4 +1,5 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from enum import Enum
from typing import Dict, List, Optional, Union from typing import Dict, List, Optional, Union
from pydantic import BaseModel from pydantic import BaseModel
@ -6,6 +7,10 @@ from pydantic import BaseModel
from app.agent.base import BaseAgent from app.agent.base import BaseAgent
class FlowType(str, Enum):
PLANNING = "planning"
class BaseFlow(BaseModel, ABC): class BaseFlow(BaseModel, ABC):
"""Base class for execution flows supporting multiple agents""" """Base class for execution flows supporting multiple agents"""
@ -55,3 +60,32 @@ class BaseFlow(BaseModel, ABC):
@abstractmethod @abstractmethod
async def execute(self, input_text: str) -> str: async def execute(self, input_text: str) -> str:
"""Execute the flow with given input""" """Execute the flow with given input"""
class PlanStepStatus(str, Enum):
"""Enum class defining possible statuses of a plan step"""
NOT_STARTED = "not_started"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
BLOCKED = "blocked"
@classmethod
def get_all_statuses(cls) -> list[str]:
"""Return a list of all possible step status values"""
return [status.value for status in cls]
@classmethod
def get_active_statuses(cls) -> list[str]:
"""Return a list of values representing active statuses (not started or in progress)"""
return [cls.NOT_STARTED.value, cls.IN_PROGRESS.value]
@classmethod
def get_status_marks(cls) -> Dict[str, str]:
"""Return a mapping of statuses to their marker symbols"""
return {
cls.COMPLETED.value: "[✓]",
cls.IN_PROGRESS.value: "[→]",
cls.BLOCKED.value: "[!]",
cls.NOT_STARTED.value: "[ ]",
}

View File

@ -1,15 +1,10 @@
from enum import Enum
from typing import Dict, List, Union from typing import Dict, List, Union
from app.agent.base import BaseAgent from app.agent.base import BaseAgent
from app.flow.base import BaseFlow from app.flow.base import BaseFlow, FlowType
from app.flow.planning import PlanningFlow from app.flow.planning import PlanningFlow
class FlowType(str, Enum):
PLANNING = "planning"
class FlowFactory: class FlowFactory:
"""Factory for creating different types of flows with support for multiple agents""" """Factory for creating different types of flows with support for multiple agents"""

View File

@ -1,47 +1,17 @@
import json import json
import time import time
from enum import Enum
from typing import Dict, List, Optional, Union from typing import Dict, List, Optional, Union
from pydantic import Field from pydantic import Field
from app.agent.base import BaseAgent from app.agent.base import BaseAgent
from app.flow.base import BaseFlow from app.flow.base import BaseFlow, PlanStepStatus
from app.llm import LLM from app.llm import LLM
from app.logger import logger from app.logger import logger
from app.schema import AgentState, Message, ToolChoice from app.schema import AgentState, Message, ToolChoice
from app.tool import PlanningTool from app.tool import PlanningTool
class PlanStepStatus(str, Enum):
"""Enum class defining possible statuses of a plan step"""
NOT_STARTED = "not_started"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
BLOCKED = "blocked"
@classmethod
def get_all_statuses(cls) -> list[str]:
"""Return a list of all possible step status values"""
return [status.value for status in cls]
@classmethod
def get_active_statuses(cls) -> list[str]:
"""Return a list of values representing active statuses (not started or in progress)"""
return [cls.NOT_STARTED.value, cls.IN_PROGRESS.value]
@classmethod
def get_status_marks(cls) -> Dict[str, str]:
"""Return a mapping of statuses to their marker symbols"""
return {
cls.COMPLETED.value: "[✓]",
cls.IN_PROGRESS.value: "[→]",
cls.BLOCKED.value: "[!]",
cls.NOT_STARTED.value: "[ ]",
}
class PlanningFlow(BaseFlow): class PlanningFlow(BaseFlow):
"""A flow that manages planning and execution of tasks using agents.""" """A flow that manages planning and execution of tasks using agents."""

View File

@ -18,7 +18,6 @@ from tenacity import (
wait_random_exponential, wait_random_exponential,
) )
from app.bedrock import BedrockClient
from app.config import LLMSettings, config from app.config import LLMSettings, config
from app.exceptions import TokenLimitExceeded from app.exceptions import TokenLimitExceeded
from app.logger import logger # Assuming a logger is set up in your app from app.logger import logger # Assuming a logger is set up in your app
@ -29,6 +28,7 @@ from app.schema import (
Message, Message,
ToolChoice, ToolChoice,
) )
from app.bedrock import BedrockClient
REASONING_MODELS = ["o1", "o3-mini"] REASONING_MODELS = ["o1", "o3-mini"]

View File

@ -1,19 +1,30 @@
import logging
import sys
logging.basicConfig(level=logging.INFO, handlers=[logging.StreamHandler(sys.stderr)])
import argparse import argparse
import asyncio import asyncio
import atexit import atexit
import json import json
import logging
import os
import sys
from inspect import Parameter, Signature from inspect import Parameter, Signature
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
from mcp.server.fastmcp import FastMCP from mcp.server.fastmcp import FastMCP
from app.logger import logger
# Add directories to Python path (needed for proper importing)
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(current_dir)
root_dir = os.path.dirname(parent_dir)
sys.path.insert(0, parent_dir)
sys.path.insert(0, current_dir)
sys.path.insert(0, root_dir)
# Configure logging (using the same format as original)
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("mcp-server")
from app.tool.base import BaseTool from app.tool.base import BaseTool
from app.tool.bash import Bash from app.tool.bash import Bash
from app.tool.browser_use_tool import BrowserUseTool from app.tool.browser_use_tool import BrowserUseTool
@ -34,6 +45,11 @@ class MCPServer:
self.tools["editor"] = StrReplaceEditor() self.tools["editor"] = StrReplaceEditor()
self.tools["terminate"] = Terminate() self.tools["terminate"] = Terminate()
from app.logger import logger as app_logger
global logger
logger = app_logger
def register_tool(self, tool: BaseTool, method_name: Optional[str] = None) -> None: def register_tool(self, tool: BaseTool, method_name: Optional[str] = None) -> None:
"""Register a tool with parameter validation and documentation.""" """Register a tool with parameter validation and documentation."""
tool_name = method_name or tool.name tool_name = method_name or tool.name

View File

@ -4,7 +4,6 @@ from typing import List
from tenacity import retry, stop_after_attempt, wait_exponential from tenacity import retry, stop_after_attempt, wait_exponential
from app.config import config from app.config import config
from app.logger import logger
from app.tool.base import BaseTool from app.tool.base import BaseTool
from app.tool.search import ( from app.tool.search import (
BaiduSearchEngine, BaiduSearchEngine,
@ -45,8 +44,6 @@ class WebSearch(BaseTool):
async def execute(self, query: str, num_results: int = 10) -> List[str]: async def execute(self, query: str, num_results: int = 10) -> List[str]:
""" """
Execute a Web search and return a list of URLs. Execute a Web search and return a list of URLs.
Tries engines in order based on configuration, falling back if an engine fails with errors.
If all engines fail, it will wait and retry up to the configured number of times.
Args: Args:
query (str): The search query to submit to the search engine. query (str): The search query to submit to the search engine.
@ -55,109 +52,37 @@ class WebSearch(BaseTool):
Returns: Returns:
List[str]: A list of URLs matching the search query. List[str]: A list of URLs matching the search query.
""" """
# Get retry settings from config
retry_delay = 60 # Default to 60 seconds
max_retries = 3 # Default to 3 retries
if config.search_config:
retry_delay = getattr(config.search_config, "retry_delay", 60)
max_retries = getattr(config.search_config, "max_retries", 3)
# Try searching with retries when all engines fail
for retry_count in range(
max_retries + 1
): # +1 because first try is not a retry
links = await self._try_all_engines(query, num_results)
if links:
return links
if retry_count < max_retries:
# All engines failed, wait and retry
logger.warning(
f"All search engines failed. Waiting {retry_delay} seconds before retry {retry_count + 1}/{max_retries}..."
)
await asyncio.sleep(retry_delay)
else:
logger.error(
f"All search engines failed after {max_retries} retries. Giving up."
)
return []
async def _try_all_engines(self, query: str, num_results: int) -> List[str]:
"""
Try all search engines in the configured order.
Args:
query (str): The search query to submit to the search engine.
num_results (int): The number of search results to return.
Returns:
List[str]: A list of URLs matching the search query, or empty list if all engines fail.
"""
engine_order = self._get_engine_order() engine_order = self._get_engine_order()
failed_engines = []
for engine_name in engine_order: for engine_name in engine_order:
engine = self._search_engine[engine_name] engine = self._search_engine[engine_name]
try: try:
logger.info(f"🔎 Attempting search with {engine_name.capitalize()}...")
links = await self._perform_search_with_engine( links = await self._perform_search_with_engine(
engine, query, num_results engine, query, num_results
) )
if links: if links:
if failed_engines:
logger.info(
f"Search successful with {engine_name.capitalize()} after trying: {', '.join(failed_engines)}"
)
return links return links
except Exception as e: except Exception as e:
failed_engines.append(engine_name.capitalize()) print(f"Search engine '{engine_name}' failed with error: {e}")
is_rate_limit = "429" in str(e) or "Too Many Requests" in str(e)
if is_rate_limit:
logger.warning(
f"⚠️ {engine_name.capitalize()} search engine rate limit exceeded, trying next engine..."
)
else:
logger.warning(
f"⚠️ {engine_name.capitalize()} search failed with error: {e}"
)
if failed_engines:
logger.error(f"All search engines failed: {', '.join(failed_engines)}")
return [] return []
def _get_engine_order(self) -> List[str]: def _get_engine_order(self) -> List[str]:
""" """
Determines the order in which to try search engines. Determines the order in which to try search engines.
Preferred engine is first (based on configuration), followed by fallback engines, Preferred engine is first (based on configuration), followed by the remaining engines.
and then the remaining engines.
Returns: Returns:
List[str]: Ordered list of search engine names. List[str]: Ordered list of search engine names.
""" """
preferred = "google" preferred = "google"
fallbacks = [] if config.search_config and config.search_config.engine:
if config.search_config:
if config.search_config.engine:
preferred = config.search_config.engine.lower() preferred = config.search_config.engine.lower()
if config.search_config.fallback_engines:
fallbacks = [
engine.lower() for engine in config.search_config.fallback_engines
]
engine_order = [] engine_order = []
# Add preferred engine first
if preferred in self._search_engine: if preferred in self._search_engine:
engine_order.append(preferred) engine_order.append(preferred)
for key in self._search_engine:
# Add configured fallback engines in order if key not in engine_order:
for fallback in fallbacks: engine_order.append(key)
if fallback in self._search_engine and fallback not in engine_order:
engine_order.append(fallback)
return engine_order return engine_order
@retry( @retry(

View File

@ -73,13 +73,6 @@ temperature = 0.0 # Controls randomness for vision mod
# [search] # [search]
# Search engine for agent to use. Default is "Google", can be set to "Baidu" or "DuckDuckGo". # Search engine for agent to use. Default is "Google", can be set to "Baidu" or "DuckDuckGo".
#engine = "Google" #engine = "Google"
# Fallback engine order. Default is ["DuckDuckGo", "Baidu"] - will try in this order after primary engine fails.
#fallback_engines = ["DuckDuckGo", "Baidu"]
# Seconds to wait before retrying all engines again when they all fail due to rate limits. Default is 60.
#retry_delay = 60
# Maximum number of times to retry all engines when all fail. Default is 3.
#max_retries = 3
## Sandbox configuration ## Sandbox configuration
#[sandbox] #[sandbox]

View File

@ -13,14 +13,10 @@ class MCPRunner:
def __init__(self): def __init__(self):
self.root_path = config.root_path self.root_path = config.root_path
self.server_reference = "app.mcp.server" self.server_script = self.root_path / "app" / "mcp" / "server.py"
self.agent = MCPAgent() self.agent = MCPAgent()
async def initialize( async def initialize(self, connection_type: str, server_url: str = None) -> None:
self,
connection_type: str,
server_url: str | None = None,
) -> None:
"""Initialize the MCP agent with the appropriate connection.""" """Initialize the MCP agent with the appropriate connection."""
logger.info(f"Initializing MCPAgent with {connection_type} connection...") logger.info(f"Initializing MCPAgent with {connection_type} connection...")
@ -28,7 +24,7 @@ class MCPRunner:
await self.agent.initialize( await self.agent.initialize(
connection_type="stdio", connection_type="stdio",
command=sys.executable, command=sys.executable,
args=["-m", self.server_reference], args=[str(self.server_script)],
) )
else: # sse else: # sse
await self.agent.initialize(connection_type="sse", server_url=server_url) await self.agent.initialize(connection_type="sse", server_url=server_url)
@ -51,14 +47,9 @@ class MCPRunner:
async def run_default(self) -> None: async def run_default(self) -> None:
"""Run the agent in default mode.""" """Run the agent in default mode."""
prompt = input("Enter your prompt: ") await self.agent.run(
if not prompt.strip(): "Hello, what tools are available to me? Terminate after you have listed the tools."
logger.warning("Empty prompt provided.") )
return
logger.warning("Processing your request...")
await self.agent.run(prompt)
logger.info("Request processing completed.")
async def cleanup(self) -> None: async def cleanup(self) -> None:
"""Clean up agent resources.""" """Clean up agent resources."""

View File

@ -1,11 +0,0 @@
# coding: utf-8
# A shortcut to launch OpenManus MCP server, where its introduction also solves other import issues.
from app.mcp.server import MCPServer, parse_args
if __name__ == "__main__":
args = parse_args()
# Create and run server (maintaining original flow)
server = MCPServer()
server.run(transport=args.transport)