add 4 tools PythonExecute(), GoogleSearch(), BrowserUseTool(), FileSaver()

This commit is contained in:
xiangjinyu 2025-03-06 23:10:14 +08:00
parent cdd9467cd2
commit 04a1e0a504
4 changed files with 415 additions and 0 deletions

View File

@ -0,0 +1,238 @@
import asyncio
import json
from typing import Optional
from browser_use import Browser as BrowserUseBrowser, BrowserConfig
from browser_use.browser.context import BrowserContext
from browser_use.dom.service import DomService
from pydantic import Field, field_validator
from pydantic_core.core_schema import ValidationInfo
from app.tool.base import BaseTool, ToolResult
_BROWSER_DESCRIPTION = """
Interact with a web browser to perform various actions such as navigation, element interaction,
content extraction, and tab management. Supported actions include:
- 'navigate': Go to a specific URL
- 'click': Click an element by index
- 'input_text': Input text into an element
- 'screenshot': Capture a screenshot
- 'get_html': Get page HTML content
- 'execute_js': Execute JavaScript code
- 'scroll': Scroll the page
- 'switch_tab': Switch to a specific tab
- 'new_tab': Open a new tab
- 'close_tab': Close the current tab
- 'refresh': Refresh the current page
"""
class BrowserUseTool(BaseTool):
name: str = "browser_use"
description: str = _BROWSER_DESCRIPTION
parameters: dict = {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": [
"navigate", "click", "input_text", "screenshot", "get_html", "execute_js",
"scroll", "switch_tab", "new_tab", "close_tab", "refresh"
],
"description": "The browser action to perform"
},
"url": {
"type": "string",
"description": "URL for 'navigate' or 'new_tab' actions"
},
"index": {
"type": "integer",
"description": "Element index for 'click' or 'input_text' actions"
},
"text": {
"type": "string",
"description": "Text for 'input_text' action"
},
"script": {
"type": "string",
"description": "JavaScript code for 'execute_js' action"
},
"scroll_amount": {
"type": "integer",
"description": "Pixels to scroll (positive for down, negative for up) for 'scroll' action"
},
"tab_id": {
"type": "integer",
"description": "Tab ID for 'switch_tab' action"
}
},
"required": ["action"],
"dependencies": {
"navigate": ["url"],
"click": ["index"],
"input_text": ["index", "text"],
"execute_js": ["script"],
"switch_tab": ["tab_id"],
"new_tab": ["url"],
"scroll": ["scroll_amount"]
}
}
lock: asyncio.Lock = Field(default_factory=asyncio.Lock)
browser: Optional[BrowserUseBrowser] = Field(default=None, exclude=True)
context: Optional[BrowserContext] = Field(default=None, exclude=True)
dom_service: Optional[DomService] = Field(default=None, exclude=True)
@field_validator('parameters', mode='before')
def validate_parameters(cls, v: dict, info: ValidationInfo) -> dict:
if not v:
raise ValueError("Parameters cannot be empty")
return v
async def _ensure_browser_initialized(self) -> BrowserContext:
"""Ensure browser and context are initialized."""
if self.browser is None:
self.browser = BrowserUseBrowser(BrowserConfig(headless=False))
if self.context is None:
self.context = await self.browser.new_context()
self.dom_service = DomService(await self.context.get_current_page())
return self.context
async def execute(self, action: str, url: Optional[str] = None, index: Optional[int] = None,
text: Optional[str] = None, script: Optional[str] = None,
scroll_amount: Optional[int] = None, tab_id: Optional[int] = None,
**kwargs) -> ToolResult:
"""
Execute a specified browser action.
Args:
action: The browser action to perform
url: URL for navigation or new tab
index: Element index for click or input actions
text: Text for input action
script: JavaScript code for execution
scroll_amount: Pixels to scroll for scroll action
tab_id: Tab ID for switch_tab action
**kwargs: Additional arguments
Returns:
ToolResult with the action's output or error
"""
async with self.lock:
try:
context = await self._ensure_browser_initialized()
if action == "navigate":
if not url:
return ToolResult(error="URL is required for 'navigate' action")
await context.navigate_to(url)
return ToolResult(output=f"Navigated to {url}")
elif action == "click":
if index is None:
return ToolResult(error="Index is required for 'click' action")
element = await context.get_dom_element_by_index(index)
if not element:
return ToolResult(error=f"Element with index {index} not found")
download_path = await context._click_element_node(element)
output = f"Clicked element at index {index}"
if download_path:
output += f" - Downloaded file to {download_path}"
return ToolResult(output=output)
elif action == "input_text":
if index is None or not text:
return ToolResult(error="Index and text are required for 'input_text' action")
element = await context.get_dom_element_by_index(index)
if not element:
return ToolResult(error=f"Element with index {index} not found")
await context._input_text_element_node(element, text)
return ToolResult(output=f"Input '{text}' into element at index {index}")
elif action == "screenshot":
screenshot = await context.take_screenshot(full_page=True)
return ToolResult(
output=f"Screenshot captured (base64 length: {len(screenshot)})",
system=screenshot
)
elif action == "get_html":
html = await context.get_page_html()
truncated = html[:2000] + "..." if len(html) > 2000 else html
return ToolResult(output=truncated)
elif action == "execute_js":
if not script:
return ToolResult(error="Script is required for 'execute_js' action")
result = await context.execute_javascript(script)
return ToolResult(output=str(result))
elif action == "scroll":
if scroll_amount is None:
return ToolResult(error="Scroll amount is required for 'scroll' action")
await context.execute_javascript(f"window.scrollBy(0, {scroll_amount});")
direction = "down" if scroll_amount > 0 else "up"
return ToolResult(output=f"Scrolled {direction} by {abs(scroll_amount)} pixels")
elif action == "switch_tab":
if tab_id is None:
return ToolResult(error="Tab ID is required for 'switch_tab' action")
await context.switch_to_tab(tab_id)
return ToolResult(output=f"Switched to tab {tab_id}")
elif action == "new_tab":
if not url:
return ToolResult(error="URL is required for 'new_tab' action")
await context.create_new_tab(url)
return ToolResult(output=f"Opened new tab with URL {url}")
elif action == "close_tab":
await context.close_current_tab()
return ToolResult(output="Closed current tab")
elif action == "refresh":
await context.refresh_page()
return ToolResult(output="Refreshed current page")
else:
return ToolResult(error=f"Unknown action: {action}")
except Exception as e:
return ToolResult(error=f"Browser action '{action}' failed: {str(e)}")
async def get_current_state(self) -> ToolResult:
"""Get the current browser state as a ToolResult."""
async with self.lock:
try:
context = await self._ensure_browser_initialized()
state = await context.get_state()
state_info = {
"url": state.url,
"title": state.title,
"tabs": [tab.model_dump() for tab in state.tabs],
"interactive_elements": state.element_tree.clickable_elements_to_string()
}
return ToolResult(output=json.dumps(state_info))
except Exception as e:
return ToolResult(error=f"Failed to get browser state: {str(e)}")
async def cleanup(self):
"""Clean up browser resources."""
async with self.lock:
if self.context is not None:
await self.context.close()
self.context = None
self.dom_service = None
if self.browser is not None:
await self.browser.close()
self.browser = None
def __del__(self):
"""Ensure cleanup when object is destroyed."""
if self.browser is not None or self.context is not None:
try:
asyncio.run(self.cleanup())
except RuntimeError:
loop = asyncio.new_event_loop()
loop.run_until_complete(self.cleanup())
loop.close()

59
app/tool/file_saver.py Normal file
View File

@ -0,0 +1,59 @@
import os
from typing import Optional
from pathlib import Path
from app.tool.base import BaseTool
class FileSaver(BaseTool):
name: str = "file_saver"
description: str = """Save content to a local file at a specified path.
Use this tool when you need to save text, code, or generated content to a file on the local filesystem.
The tool accepts content and a file path, and saves the content to that location.
"""
parameters: dict = {
"type": "object",
"properties": {
"content": {
"type": "string",
"description": "(required) The content to save to the file."
},
"file_path": {
"type": "string",
"description": "(required) The path where the file should be saved, including filename and extension."
},
"mode": {
"type": "string",
"description": "(optional) The file opening mode. Default is 'w' for write. Use 'a' for append.",
"enum": ["w", "a"],
"default": "w"
}
},
"required": ["content", "file_path"]
}
async def execute(self, content: str, file_path: str, mode: str = "w") -> str:
"""
Save content to a file at the specified path.
Args:
content (str): The content to save to the file.
file_path (str): The path where the file should be saved.
mode (str, optional): The file opening mode. Default is 'w' for write. Use 'a' for append.
Returns:
str: A message indicating the result of the operation.
"""
try:
# Ensure the directory exists
directory = os.path.dirname(file_path)
if directory and not os.path.exists(directory):
os.makedirs(directory)
# Write directly to the file
with open(file_path, mode, encoding='utf-8') as file:
file.write(content)
return f"Content successfully saved to {file_path}"
except Exception as e:
return f"Error saving file: {str(e)}"

48
app/tool/google_search.py Normal file
View File

@ -0,0 +1,48 @@
import asyncio
from typing import Optional, List
from googlesearch import search
from app.tool.base import BaseTool
class GoogleSearch(BaseTool):
name: str = "google_search"
description: str = """Perform a Google search and return a list of relevant links.
Use this tool when you need to find information on the web, get up-to-date data, or research specific topics.
The tool returns a list of URLs that match the search query.
"""
parameters: dict = {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "(required) The search query to submit to Google."
},
"num_results": {
"type": "integer",
"description": "(optional) The number of search results to return. Default is 10.",
"default": 10
}
},
"required": ["query"]
}
async def execute(self, query: str, num_results: int = 10) -> List[str]:
"""
Execute a Google search and return a list of URLs.
Args:
query (str): The search query to submit to Google.
num_results (int, optional): The number of search results to return. Default is 10.
Returns:
List[str]: A list of URLs matching the search query.
"""
# Run the search in a thread pool to prevent blocking
loop = asyncio.get_event_loop()
links = await loop.run_in_executor(
None,
lambda: list(search(query, num_results=num_results))
)
return links

View File

@ -0,0 +1,70 @@
import threading
from typing import Dict
from app.tool.base import BaseTool
class PythonExecute(BaseTool):
"""A tool for executing Python code with timeout and safety restrictions."""
name: str = "python_execute"
description: str = "Executes Python code string. Note: Only print outputs are visible, function return values are not captured. Use print statements to see results."
parameters: dict = {
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "The Python code to execute.",
},
},
"required": ["code"],
}
async def execute(
self,
code: str,
timeout: int = 5,
) -> Dict:
"""
Executes the provided Python code with a timeout.
Args:
code (str): The Python code to execute.
timeout (int): Execution timeout in seconds.
Returns:
Dict: Contains 'output' with execution output or error message and 'success' status.
"""
result = {"observation": ""}
def run_code():
try:
safe_globals = {"__builtins__": dict(__builtins__)}
import sys
from io import StringIO
output_buffer = StringIO()
sys.stdout = output_buffer
exec(code, safe_globals, {})
sys.stdout = sys.__stdout__
result["observation"] = output_buffer.getvalue()
except Exception as e:
result["observation"] = str(e)
result["success"] = False
thread = threading.Thread(target=run_code)
thread.start()
thread.join(timeout)
if thread.is_alive():
return {
"observation": f"Execution timeout after {timeout} seconds",
"success": False,
}
return result