OpenManus/app/tool/browser_use_tool.py
2025-03-17 23:50:42 +08:00

564 lines
24 KiB
Python

import asyncio
import json
from typing import Generic, Optional, TypeVar
from browser_use import Browser as BrowserUseBrowser
from browser_use import BrowserConfig
from browser_use.browser.context import BrowserContext, BrowserContextConfig
from browser_use.dom.service import DomService
from pydantic import Field, field_validator
from pydantic_core.core_schema import ValidationInfo
from app.config import config
from app.tool.base import BaseTool, ToolResult
from app.tool.web_search import WebSearch
_BROWSER_DESCRIPTION = """
Interact with a web browser to perform various actions such as navigation, element interaction, content extraction, and tab management. This tool provides a comprehensive set of browser automation capabilities:
Navigation:
- 'navigate': Go to a specific URL
- 'go_back': Navigate back in browser history
- 'refresh': Refresh the current page
- 'web_search': Search the web with a specific query
Element Interaction:
- 'click_element': Click an element by index
- 'input_text': Input text into a form element
- 'scroll_down'/'scroll_up': Scroll the page (with optional pixel amount)
- 'scroll_to_text': Scroll to specific text on the page
- 'send_keys': Send keyboard shortcuts or special keys
- 'get_dropdown_options': Get all options from a dropdown
- 'select_dropdown_option': Select an option from a dropdown by text
Content Extraction:
- 'get_current_state': Get detailed browser state including URL, title, tabs, and interactive elements
- 'get_html': Get page HTML content
- 'get_text': Get text content of the page (supports start_index and end_index parameters)
- 'read_links': Get all links on the page
- 'extract_content': Extract specific information from the page using AI
- 'screenshot': Capture a screenshot
Tab Management:
- 'switch_tab': Switch to a specific tab
- 'open_tab': Open a new tab with a URL
- 'close_tab': Close the current tab
Utility:
- 'wait': Wait for a specified number of seconds
- 'execute_js': Execute JavaScript code on the page
Task Completion:
- 'done': Complete the task and return results
Each action requires specific parameters. Use get_current_state first to understand the current browser context.
"""
Context = TypeVar("Context")
class BrowserUseTool(BaseTool, Generic[Context]):
name: str = "browser_use"
description: str = _BROWSER_DESCRIPTION
parameters: dict = {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": [
"navigate",
"click_element",
"get_current_state",
"input_text",
"screenshot",
"get_html",
"get_text",
"read_links",
"execute_js",
"scroll_down",
"scroll_up",
"scroll_to_text",
"send_keys",
"get_dropdown_options",
"select_dropdown_option",
"go_back",
"web_search",
"wait",
"done",
],
"description": "The browser action to perform",
},
"url": {
"type": "string",
"description": "URL for 'navigate' or 'new_tab' actions",
},
"index": {
"type": "integer",
"description": "Element index (retrieved using get_current_state) for 'click_element' or 'input_text' actions",
},
"text": {"type": "string", "description": "Text for 'input_text' action"},
"script": {
"type": "string",
"description": "JavaScript code for 'execute_js' action",
},
"scroll_amount": {
"type": "integer",
"description": "Pixels to scroll (positive for down, negative for up) for 'scroll_down' or 'scroll_up' actions",
},
"tab_id": {
"type": "integer",
"description": "Tab ID for 'switch_tab' action",
},
"start_index": {
"type": "integer",
"description": "Starting character index for text observation (for 'scroll_to_text' and 'get_text' actions)",
},
"end_index": {
"type": "integer",
"description": "Ending character index for text observation (for 'scroll_to_text' and 'get_text' actions)",
},
"query": {
"type": "string",
"description": "Search query for 'web_search' action",
},
"goal": {
"type": "string",
"description": "Extraction goal for 'extract_content' action",
},
"success": {
"type": "boolean",
"description": "Success status for 'done' action",
},
"keys": {
"type": "string",
"description": "Keys to send for 'send_keys' action",
},
"seconds": {
"type": "integer",
"description": "Seconds to wait for 'wait' action",
},
},
"required": ["action"],
"dependencies": {
"navigate": ["url"],
"click_element": ["index"],
"input_text": ["index", "text"],
"execute_js": ["script"],
"switch_tab": ["tab_id"],
"new_tab": ["url"],
"scroll_down": ["scroll_amount"],
"scroll_up": ["scroll_amount"],
"scroll_to_text": ["text"],
"send_keys": ["keys"],
"get_dropdown_options": ["index"],
"select_dropdown_option": ["index", "text"],
"go_back": [],
"web_search": ["query"],
"wait": ["seconds"],
"done": ["text"],
},
}
lock: asyncio.Lock = Field(default_factory=asyncio.Lock)
browser: Optional[BrowserUseBrowser] = Field(default=None, exclude=True)
context: Optional[BrowserContext] = Field(default=None, exclude=True)
dom_service: Optional[DomService] = Field(default=None, exclude=True)
web_search_tool: WebSearch = Field(default_factory=WebSearch, exclude=True)
# Context for generic functionality
tool_context: Optional[Context] = Field(default=None, exclude=True)
@field_validator("parameters", mode="before")
def validate_parameters(cls, v: dict, info: ValidationInfo) -> dict:
if not v:
raise ValueError("Parameters cannot be empty")
return v
async def _ensure_browser_initialized(self) -> BrowserContext:
"""Ensure browser and context are initialized."""
if self.browser is None:
browser_config_kwargs = {"headless": False, "disable_security": True}
if config.browser_config:
from browser_use.browser.browser import ProxySettings
# handle proxy settings.
if config.browser_config.proxy and config.browser_config.proxy.server:
browser_config_kwargs["proxy"] = ProxySettings(
server=config.browser_config.proxy.server,
username=config.browser_config.proxy.username,
password=config.browser_config.proxy.password,
)
browser_attrs = [
"headless",
"disable_security",
"extra_chromium_args",
"chrome_instance_path",
"wss_url",
"cdp_url",
]
for attr in browser_attrs:
value = getattr(config.browser_config, attr, None)
if value is not None:
if not isinstance(value, list) or value:
browser_config_kwargs[attr] = value
self.browser = BrowserUseBrowser(BrowserConfig(**browser_config_kwargs))
if self.context is None:
context_config = BrowserContextConfig()
# if there is context config in the config, use it.
if (
config.browser_config
and hasattr(config.browser_config, "new_context_config")
and config.browser_config.new_context_config
):
context_config = config.browser_config.new_context_config
self.context = await self.browser.new_context(context_config)
self.dom_service = DomService(await self.context.get_current_page())
return self.context
async def execute(
self,
action: str,
url: Optional[str] = None,
index: Optional[int] = None,
text: Optional[str] = None,
script: Optional[str] = None,
scroll_amount: Optional[int] = None,
tab_id: Optional[int] = None,
start_index: Optional[int] = None,
end_index: Optional[int] = None,
query: Optional[str] = None,
goal: Optional[str] = None,
success: Optional[bool] = None,
keys: Optional[str] = None,
seconds: Optional[int] = None,
**kwargs,
) -> ToolResult:
"""
Execute a specified browser action.
Args:
action: The browser action to perform
url: URL for navigation or new tab
index: Element index for click or input actions
text: Text for input action or search query
script: JavaScript code for execution
scroll_amount: Pixels to scroll for scroll action
tab_id: Tab ID for switch_tab action
start_index: Starting character index for text observation
end_index: Ending character index for text observation
query: Search query for Google search
goal: Extraction goal for content extraction
success: Success status for done action
keys: Keys to send for keyboard actions
seconds: Seconds to wait
**kwargs: Additional arguments
Returns:
ToolResult with the action's output or error
"""
async with self.lock:
try:
context = await self._ensure_browser_initialized()
# Get max content length from config
max_content_length = getattr(
config.browser_config, "max_content_length", 2000
)
# Navigation actions
if action == "navigate":
if not url:
return ToolResult(error="URL is required for 'navigate' action")
await context.navigate_to(url)
return ToolResult(output=f"Navigated to {url}")
elif action == "go_back":
await context.go_back()
return ToolResult(output="Navigated back")
elif action == "refresh":
await context.refresh_page()
return ToolResult(output="Refreshed current page")
elif action == "web_search":
if not query:
return ToolResult(
error="Query is required for 'web_search' action"
)
search_results = await self.web_search_tool.execute(query)
if search_results:
# Navigate to the first search result
first_result = search_results[0]
await context.navigate_to(first_result)
return ToolResult(
output=f"Searched for '{query}' and navigated to first result: {first_result}\nAll results:"
+ "\n".join(search_results)
)
else:
return ToolResult(
error=f"No search results found for '{query}'"
)
# Element interaction actions
elif action == "click_element":
if index is None:
return ToolResult(
error="Index is required for 'click_element' action"
)
element = await context.get_dom_element_by_index(index)
if not element:
return ToolResult(error=f"Element with index {index} not found")
download_path = await context._click_element_node(element)
output = f"Clicked element at index {index}"
if download_path:
output += f" - Downloaded file to {download_path}"
return ToolResult(output=output)
elif action == "input_text":
if index is None or not text:
return ToolResult(
error="Index and text are required for 'input_text' action"
)
element = await context.get_dom_element_by_index(index)
if not element:
return ToolResult(error=f"Element with index {index} not found")
await context._input_text_element_node(element, text)
return ToolResult(
output=f"Input '{text}' into element at index {index}"
)
elif action == "scroll_down" or action == "scroll_up":
direction = 1 if action == "scroll_down" else -1
amount = (
scroll_amount
if scroll_amount is not None
else context.config.browser_window_size["height"]
)
await context.execute_javascript(
f"window.scrollBy(0, {direction * amount});"
)
return ToolResult(
output=f"Scrolled {'down' if direction > 0 else 'up'} by {amount} pixels"
)
elif action == "scroll_to_text":
if not text:
return ToolResult(
error="Text is required for 'scroll_to_text' action"
)
page = await context.get_current_page()
try:
locator = page.get_by_text(text, exact=False)
await locator.scroll_into_view_if_needed()
return ToolResult(output=f"Scrolled to text: '{text}'")
except Exception as e:
return ToolResult(error=f"Failed to scroll to text: {str(e)}")
elif action == "send_keys":
if not keys:
return ToolResult(
error="Keys are required for 'send_keys' action"
)
page = await context.get_current_page()
await page.keyboard.press(keys)
return ToolResult(output=f"Sent keys: {keys}")
elif action == "get_dropdown_options":
if index is None:
return ToolResult(
error="Index is required for 'get_dropdown_options' action"
)
element = await context.get_dom_element_by_index(index)
if not element:
return ToolResult(error=f"Element with index {index} not found")
page = await context.get_current_page()
options = await page.evaluate(
"""
(xpath) => {
const select = document.evaluate(xpath, document, null,
XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue;
if (!select) return null;
return Array.from(select.options).map(opt => ({
text: opt.text,
value: opt.value,
index: opt.index
}));
}
""",
element.xpath,
)
return ToolResult(output=f"Dropdown options: {options}")
elif action == "select_dropdown_option":
if index is None or not text:
return ToolResult(
error="Index and text are required for 'select_dropdown_option' action"
)
element = await context.get_dom_element_by_index(index)
if not element:
return ToolResult(error=f"Element with index {index} not found")
page = await context.get_current_page()
await page.select_option(element.xpath, label=text)
return ToolResult(
output=f"Selected option '{text}' from dropdown at index {index}"
)
# Content extraction actions
elif action == "get_current_state":
return await self.get_current_state(context)
elif action == "get_html":
html = await context.get_page_html()
truncated = (
html[:max_content_length] + "..."
if len(html) > max_content_length
else html
)
return ToolResult(output=truncated)
elif action == "get_text":
start = start_index if start_index is not None else 0
end = end_index if end_index is not None else max_content_length
text = await context.execute_javascript(
f"document.body.innerText.substring({start}, {end})"
)
full_length = await context.execute_javascript(
"document.body.innerText.length"
)
result = f"Text from index {start} to {end}:\n{text}"
if end < full_length:
result += f"\n\n[Text continues... {full_length - end} more characters available]"
if start > 0:
result += f"\n[{start} characters before this point]"
return ToolResult(output=result)
elif action == "read_links":
links = await context.execute_javascript(
"Array.from(document.querySelectorAll('a[href]')).map(elem => elem.innerText && elem.href ? `${elem.innerText.trim()} - ${elem.href}` : null).filter(Boolean).join('\\n')"
)
return ToolResult(output=links)
elif action == "extract_content":
if not goal:
return ToolResult(
error="Goal is required for 'extract_content' action"
)
await context.get_page_html()
# Note: In a real implementation, this would use an LLM to extract content
return ToolResult(output=f"Extracted content for goal: {goal}")
elif action == "screenshot":
screenshot = await context.take_screenshot(full_page=True)
return ToolResult(
output=f"Screenshot captured (base64 length: {len(screenshot)})",
system=screenshot,
)
# Tab management actions
elif action == "switch_tab":
if tab_id is None:
return ToolResult(
error="Tab ID is required for 'switch_tab' action"
)
await context.switch_to_tab(tab_id)
return ToolResult(output=f"Switched to tab {tab_id}")
elif action == "open_tab":
if not url:
return ToolResult(error="URL is required for 'open_tab' action")
await context.create_new_tab(url)
return ToolResult(output=f"Opened new tab with URL {url}")
elif action == "close_tab":
await context.close_current_tab()
return ToolResult(output="Closed current tab")
# Utility actions
elif action == "wait":
seconds_to_wait = seconds if seconds is not None else 3
await asyncio.sleep(seconds_to_wait)
return ToolResult(output=f"Waited for {seconds_to_wait} seconds")
elif action == "execute_js":
if not script:
return ToolResult(
error="Script is required for 'execute_js' action"
)
result = await context.execute_javascript(script)
return ToolResult(output=str(result))
# Task completion
elif action == "done":
if not text:
return ToolResult(error="Text is required for 'done' action")
success_value = success if success is not None else True
return ToolResult(output=text, is_done=True, success=success_value)
else:
return ToolResult(error=f"Unknown action: {action}")
except Exception as e:
return ToolResult(error=f"Browser action '{action}' failed: {str(e)}")
async def get_current_state(self, context: BrowserContext) -> ToolResult:
"""Get the current browser state as a ToolResult."""
try:
state = await context.get_state()
state_info = {
"url": state.url,
"title": state.title,
"tabs": [tab.model_dump() for tab in state.tabs],
"help": "[0], [1], [2], etc., represent clickable indices corresponding to the elements listed. Clicking on these indices will navigate to or interact with the respective content behind them.",
"interactive_elements": state.element_tree.clickable_elements_to_string(),
"scroll_info": {
"pixels_above": state.pixels_above,
"pixels_below": state.pixels_below,
"total_height": state.pixels_above
+ state.pixels_below
+ (state.viewport_info.height if state.viewport_info else 0),
},
}
return ToolResult(
output=json.dumps(state_info, indent=4, ensure_ascii=False)
)
except Exception as e:
return ToolResult(error=f"Failed to get browser state: {str(e)}")
async def cleanup(self):
"""Clean up browser resources."""
async with self.lock:
if self.context is not None:
await self.context.close()
self.context = None
self.dom_service = None
if self.browser is not None:
await self.browser.close()
self.browser = None
def __del__(self):
"""Ensure cleanup when object is destroyed."""
if self.browser is not None or self.context is not None:
try:
asyncio.run(self.cleanup())
except RuntimeError:
loop = asyncio.new_event_loop()
loop.run_until_complete(self.cleanup())
loop.close()
@classmethod
def create_with_context(cls, context: Context) -> "BrowserUseTool[Context]":
"""Factory method to create a BrowserUseTool with a specific context."""
tool = cls()
tool.tool_context = context
return tool