From 04a1e0a504f0de4dade1fa5617cdfad89fac1d9c Mon Sep 17 00:00:00 2001 From: xiangjinyu Date: Thu, 6 Mar 2025 23:10:14 +0800 Subject: [PATCH] add 4 tools PythonExecute(), GoogleSearch(), BrowserUseTool(), FileSaver() --- app/tool/browser_use_tool.py | 238 +++++++++++++++++++++++++++++++++++ app/tool/file_saver.py | 59 +++++++++ app/tool/google_search.py | 48 +++++++ app/tool/python_execute.py | 70 +++++++++++ 4 files changed, 415 insertions(+) create mode 100644 app/tool/browser_use_tool.py create mode 100644 app/tool/file_saver.py create mode 100644 app/tool/google_search.py create mode 100644 app/tool/python_execute.py diff --git a/app/tool/browser_use_tool.py b/app/tool/browser_use_tool.py new file mode 100644 index 0000000..82150a2 --- /dev/null +++ b/app/tool/browser_use_tool.py @@ -0,0 +1,238 @@ +import asyncio +import json +from typing import Optional + +from browser_use import Browser as BrowserUseBrowser, BrowserConfig +from browser_use.browser.context import BrowserContext +from browser_use.dom.service import DomService +from pydantic import Field, field_validator +from pydantic_core.core_schema import ValidationInfo + +from app.tool.base import BaseTool, ToolResult + +_BROWSER_DESCRIPTION = """ +Interact with a web browser to perform various actions such as navigation, element interaction, +content extraction, and tab management. Supported actions include: +- 'navigate': Go to a specific URL +- 'click': Click an element by index +- 'input_text': Input text into an element +- 'screenshot': Capture a screenshot +- 'get_html': Get page HTML content +- 'execute_js': Execute JavaScript code +- 'scroll': Scroll the page +- 'switch_tab': Switch to a specific tab +- 'new_tab': Open a new tab +- 'close_tab': Close the current tab +- 'refresh': Refresh the current page +""" + + +class BrowserUseTool(BaseTool): + name: str = "browser_use" + description: str = _BROWSER_DESCRIPTION + parameters: dict = { + "type": "object", + "properties": { + "action": { + "type": "string", + "enum": [ + "navigate", "click", "input_text", "screenshot", "get_html", "execute_js", + "scroll", "switch_tab", "new_tab", "close_tab", "refresh" + ], + "description": "The browser action to perform" + }, + "url": { + "type": "string", + "description": "URL for 'navigate' or 'new_tab' actions" + }, + "index": { + "type": "integer", + "description": "Element index for 'click' or 'input_text' actions" + }, + "text": { + "type": "string", + "description": "Text for 'input_text' action" + }, + "script": { + "type": "string", + "description": "JavaScript code for 'execute_js' action" + }, + "scroll_amount": { + "type": "integer", + "description": "Pixels to scroll (positive for down, negative for up) for 'scroll' action" + }, + "tab_id": { + "type": "integer", + "description": "Tab ID for 'switch_tab' action" + } + }, + "required": ["action"], + "dependencies": { + "navigate": ["url"], + "click": ["index"], + "input_text": ["index", "text"], + "execute_js": ["script"], + "switch_tab": ["tab_id"], + "new_tab": ["url"], + "scroll": ["scroll_amount"] + } + } + + lock: asyncio.Lock = Field(default_factory=asyncio.Lock) + browser: Optional[BrowserUseBrowser] = Field(default=None, exclude=True) + context: Optional[BrowserContext] = Field(default=None, exclude=True) + dom_service: Optional[DomService] = Field(default=None, exclude=True) + + @field_validator('parameters', mode='before') + def validate_parameters(cls, v: dict, info: ValidationInfo) -> dict: + if not v: + raise ValueError("Parameters cannot be empty") + return v + + async def _ensure_browser_initialized(self) -> BrowserContext: + """Ensure browser and context are initialized.""" + if self.browser is None: + self.browser = BrowserUseBrowser(BrowserConfig(headless=False)) + if self.context is None: + self.context = await self.browser.new_context() + self.dom_service = DomService(await self.context.get_current_page()) + return self.context + + async def execute(self, action: str, url: Optional[str] = None, index: Optional[int] = None, + text: Optional[str] = None, script: Optional[str] = None, + scroll_amount: Optional[int] = None, tab_id: Optional[int] = None, + **kwargs) -> ToolResult: + """ + Execute a specified browser action. + + Args: + action: The browser action to perform + url: URL for navigation or new tab + index: Element index for click or input actions + text: Text for input action + script: JavaScript code for execution + scroll_amount: Pixels to scroll for scroll action + tab_id: Tab ID for switch_tab action + **kwargs: Additional arguments + + Returns: + ToolResult with the action's output or error + """ + async with self.lock: + try: + context = await self._ensure_browser_initialized() + + if action == "navigate": + if not url: + return ToolResult(error="URL is required for 'navigate' action") + await context.navigate_to(url) + return ToolResult(output=f"Navigated to {url}") + + elif action == "click": + if index is None: + return ToolResult(error="Index is required for 'click' action") + element = await context.get_dom_element_by_index(index) + if not element: + return ToolResult(error=f"Element with index {index} not found") + download_path = await context._click_element_node(element) + output = f"Clicked element at index {index}" + if download_path: + output += f" - Downloaded file to {download_path}" + return ToolResult(output=output) + + elif action == "input_text": + if index is None or not text: + return ToolResult(error="Index and text are required for 'input_text' action") + element = await context.get_dom_element_by_index(index) + if not element: + return ToolResult(error=f"Element with index {index} not found") + await context._input_text_element_node(element, text) + return ToolResult(output=f"Input '{text}' into element at index {index}") + + elif action == "screenshot": + screenshot = await context.take_screenshot(full_page=True) + return ToolResult( + output=f"Screenshot captured (base64 length: {len(screenshot)})", + system=screenshot + ) + + elif action == "get_html": + html = await context.get_page_html() + truncated = html[:2000] + "..." if len(html) > 2000 else html + return ToolResult(output=truncated) + + elif action == "execute_js": + if not script: + return ToolResult(error="Script is required for 'execute_js' action") + result = await context.execute_javascript(script) + return ToolResult(output=str(result)) + + elif action == "scroll": + if scroll_amount is None: + return ToolResult(error="Scroll amount is required for 'scroll' action") + await context.execute_javascript(f"window.scrollBy(0, {scroll_amount});") + direction = "down" if scroll_amount > 0 else "up" + return ToolResult(output=f"Scrolled {direction} by {abs(scroll_amount)} pixels") + + elif action == "switch_tab": + if tab_id is None: + return ToolResult(error="Tab ID is required for 'switch_tab' action") + await context.switch_to_tab(tab_id) + return ToolResult(output=f"Switched to tab {tab_id}") + + elif action == "new_tab": + if not url: + return ToolResult(error="URL is required for 'new_tab' action") + await context.create_new_tab(url) + return ToolResult(output=f"Opened new tab with URL {url}") + + elif action == "close_tab": + await context.close_current_tab() + return ToolResult(output="Closed current tab") + + elif action == "refresh": + await context.refresh_page() + return ToolResult(output="Refreshed current page") + + else: + return ToolResult(error=f"Unknown action: {action}") + + except Exception as e: + return ToolResult(error=f"Browser action '{action}' failed: {str(e)}") + + async def get_current_state(self) -> ToolResult: + """Get the current browser state as a ToolResult.""" + async with self.lock: + try: + context = await self._ensure_browser_initialized() + state = await context.get_state() + state_info = { + "url": state.url, + "title": state.title, + "tabs": [tab.model_dump() for tab in state.tabs], + "interactive_elements": state.element_tree.clickable_elements_to_string() + } + return ToolResult(output=json.dumps(state_info)) + except Exception as e: + return ToolResult(error=f"Failed to get browser state: {str(e)}") + + async def cleanup(self): + """Clean up browser resources.""" + async with self.lock: + if self.context is not None: + await self.context.close() + self.context = None + self.dom_service = None + if self.browser is not None: + await self.browser.close() + self.browser = None + + def __del__(self): + """Ensure cleanup when object is destroyed.""" + if self.browser is not None or self.context is not None: + try: + asyncio.run(self.cleanup()) + except RuntimeError: + loop = asyncio.new_event_loop() + loop.run_until_complete(self.cleanup()) + loop.close() diff --git a/app/tool/file_saver.py b/app/tool/file_saver.py new file mode 100644 index 0000000..a32835d --- /dev/null +++ b/app/tool/file_saver.py @@ -0,0 +1,59 @@ +import os +from typing import Optional +from pathlib import Path + +from app.tool.base import BaseTool + + +class FileSaver(BaseTool): + name: str = "file_saver" + description: str = """Save content to a local file at a specified path. +Use this tool when you need to save text, code, or generated content to a file on the local filesystem. +The tool accepts content and a file path, and saves the content to that location. +""" + parameters: dict = { + "type": "object", + "properties": { + "content": { + "type": "string", + "description": "(required) The content to save to the file." + }, + "file_path": { + "type": "string", + "description": "(required) The path where the file should be saved, including filename and extension." + }, + "mode": { + "type": "string", + "description": "(optional) The file opening mode. Default is 'w' for write. Use 'a' for append.", + "enum": ["w", "a"], + "default": "w" + } + }, + "required": ["content", "file_path"] + } + + async def execute(self, content: str, file_path: str, mode: str = "w") -> str: + """ + Save content to a file at the specified path. + + Args: + content (str): The content to save to the file. + file_path (str): The path where the file should be saved. + mode (str, optional): The file opening mode. Default is 'w' for write. Use 'a' for append. + + Returns: + str: A message indicating the result of the operation. + """ + try: + # Ensure the directory exists + directory = os.path.dirname(file_path) + if directory and not os.path.exists(directory): + os.makedirs(directory) + + # Write directly to the file + with open(file_path, mode, encoding='utf-8') as file: + file.write(content) + + return f"Content successfully saved to {file_path}" + except Exception as e: + return f"Error saving file: {str(e)}" diff --git a/app/tool/google_search.py b/app/tool/google_search.py new file mode 100644 index 0000000..b0c3e6f --- /dev/null +++ b/app/tool/google_search.py @@ -0,0 +1,48 @@ +import asyncio +from typing import Optional, List +from googlesearch import search + +from app.tool.base import BaseTool + + +class GoogleSearch(BaseTool): + name: str = "google_search" + description: str = """Perform a Google search and return a list of relevant links. +Use this tool when you need to find information on the web, get up-to-date data, or research specific topics. +The tool returns a list of URLs that match the search query. +""" + parameters: dict = { + "type": "object", + "properties": { + "query": { + "type": "string", + "description": "(required) The search query to submit to Google." + }, + "num_results": { + "type": "integer", + "description": "(optional) The number of search results to return. Default is 10.", + "default": 10 + } + }, + "required": ["query"] + } + + async def execute(self, query: str, num_results: int = 10) -> List[str]: + """ + Execute a Google search and return a list of URLs. + + Args: + query (str): The search query to submit to Google. + num_results (int, optional): The number of search results to return. Default is 10. + + Returns: + List[str]: A list of URLs matching the search query. + """ + # Run the search in a thread pool to prevent blocking + loop = asyncio.get_event_loop() + links = await loop.run_in_executor( + None, + lambda: list(search(query, num_results=num_results)) + ) + + return links diff --git a/app/tool/python_execute.py b/app/tool/python_execute.py new file mode 100644 index 0000000..88e1aab --- /dev/null +++ b/app/tool/python_execute.py @@ -0,0 +1,70 @@ +import threading +from typing import Dict + +from app.tool.base import BaseTool + + +class PythonExecute(BaseTool): + """A tool for executing Python code with timeout and safety restrictions.""" + + name: str = "python_execute" + description: str = "Executes Python code string. Note: Only print outputs are visible, function return values are not captured. Use print statements to see results." + parameters: dict = { + "type": "object", + "properties": { + "code": { + "type": "string", + "description": "The Python code to execute.", + }, + }, + "required": ["code"], + } + + async def execute( + self, + code: str, + timeout: int = 5, + ) -> Dict: + """ + Executes the provided Python code with a timeout. + + Args: + code (str): The Python code to execute. + timeout (int): Execution timeout in seconds. + + Returns: + Dict: Contains 'output' with execution output or error message and 'success' status. + """ + result = {"observation": ""} + + def run_code(): + try: + safe_globals = {"__builtins__": dict(__builtins__)} + + import sys + from io import StringIO + + output_buffer = StringIO() + sys.stdout = output_buffer + + exec(code, safe_globals, {}) + + sys.stdout = sys.__stdout__ + + result["observation"] = output_buffer.getvalue() + + except Exception as e: + result["observation"] = str(e) + result["success"] = False + + thread = threading.Thread(target=run_code) + thread.start() + thread.join(timeout) + + if thread.is_alive(): + return { + "observation": f"Execution timeout after {timeout} seconds", + "success": False, + } + + return result