import asyncio import json from typing import Generic, Optional, TypeVar from browser_use import Browser as BrowserUseBrowser from browser_use import BrowserConfig from browser_use.browser.context import BrowserContext, BrowserContextConfig from browser_use.dom.service import DomService from pydantic import Field, field_validator from pydantic_core.core_schema import ValidationInfo from app.config import config from app.tool.base import BaseTool, ToolResult from app.tool.web_search import WebSearch _BROWSER_DESCRIPTION = """ Interact with a web browser to perform various actions such as navigation, element interaction, content extraction, and tab management. This tool provides a comprehensive set of browser automation capabilities: Navigation: - 'navigate': Go to a specific URL - 'go_back': Navigate back in browser history - 'refresh': Refresh the current page - 'web_search': Search the web with a specific query Element Interaction: - 'click_element': Click an element by index - 'input_text': Input text into a form element - 'scroll_down'/'scroll_up': Scroll the page (with optional pixel amount) - 'scroll_to_text': Scroll to specific text on the page - 'send_keys': Send keyboard shortcuts or special keys - 'get_dropdown_options': Get all options from a dropdown - 'select_dropdown_option': Select an option from a dropdown by text Content Extraction: - 'get_current_state': Get detailed browser state including URL, title, tabs, and interactive elements - 'get_html': Get page HTML content - 'get_text': Get text content of the page (supports start_index and end_index parameters) - 'read_links': Get all links on the page - 'extract_content': Extract specific information from the page using AI - 'screenshot': Capture a screenshot Tab Management: - 'switch_tab': Switch to a specific tab - 'open_tab': Open a new tab with a URL - 'close_tab': Close the current tab Utility: - 'wait': Wait for a specified number of seconds - 'execute_js': Execute JavaScript code on the page Task Completion: - 'done': Complete the task and return results Each action requires specific parameters. Use get_current_state first to understand the current browser context. """ Context = TypeVar("Context") class BrowserUseTool(BaseTool, Generic[Context]): name: str = "browser_use" description: str = _BROWSER_DESCRIPTION parameters: dict = { "type": "object", "properties": { "action": { "type": "string", "enum": [ "navigate", "click_element", "get_current_state", "input_text", "screenshot", "get_html", "get_text", "read_links", "execute_js", "scroll_down", "scroll_up", "scroll_to_text", "send_keys", "get_dropdown_options", "select_dropdown_option", "go_back", "web_search", "wait", "done", ], "description": "The browser action to perform", }, "url": { "type": "string", "description": "URL for 'navigate' or 'new_tab' actions", }, "index": { "type": "integer", "description": "Element index (retrieved using get_current_state) for 'click_element' or 'input_text' actions", }, "text": {"type": "string", "description": "Text for 'input_text' action"}, "script": { "type": "string", "description": "JavaScript code for 'execute_js' action", }, "scroll_amount": { "type": "integer", "description": "Pixels to scroll (positive for down, negative for up) for 'scroll_down' or 'scroll_up' actions", }, "tab_id": { "type": "integer", "description": "Tab ID for 'switch_tab' action", }, "start_index": { "type": "integer", "description": "Starting character index for text observation (for 'scroll_to_text' and 'get_text' actions)", }, "end_index": { "type": "integer", "description": "Ending character index for text observation (for 'scroll_to_text' and 'get_text' actions)", }, "query": { "type": "string", "description": "Search query for 'web_search' action", }, "goal": { "type": "string", "description": "Extraction goal for 'extract_content' action", }, "success": { "type": "boolean", "description": "Success status for 'done' action", }, "keys": { "type": "string", "description": "Keys to send for 'send_keys' action", }, "seconds": { "type": "integer", "description": "Seconds to wait for 'wait' action", }, }, "required": ["action"], "dependencies": { "navigate": ["url"], "click_element": ["index"], "input_text": ["index", "text"], "execute_js": ["script"], "switch_tab": ["tab_id"], "new_tab": ["url"], "scroll_down": ["scroll_amount"], "scroll_up": ["scroll_amount"], "scroll_to_text": ["text"], "send_keys": ["keys"], "get_dropdown_options": ["index"], "select_dropdown_option": ["index", "text"], "go_back": [], "web_search": ["query"], "wait": ["seconds"], "done": ["text"], }, } lock: asyncio.Lock = Field(default_factory=asyncio.Lock) browser: Optional[BrowserUseBrowser] = Field(default=None, exclude=True) context: Optional[BrowserContext] = Field(default=None, exclude=True) dom_service: Optional[DomService] = Field(default=None, exclude=True) web_search_tool: WebSearch = Field(default_factory=WebSearch, exclude=True) # Context for generic functionality tool_context: Optional[Context] = Field(default=None, exclude=True) @field_validator("parameters", mode="before") def validate_parameters(cls, v: dict, info: ValidationInfo) -> dict: if not v: raise ValueError("Parameters cannot be empty") return v async def _ensure_browser_initialized(self) -> BrowserContext: """Ensure browser and context are initialized.""" if self.browser is None: browser_config_kwargs = {"headless": False, "disable_security": True} if config.browser_config: from browser_use.browser.browser import ProxySettings # handle proxy settings. if config.browser_config.proxy and config.browser_config.proxy.server: browser_config_kwargs["proxy"] = ProxySettings( server=config.browser_config.proxy.server, username=config.browser_config.proxy.username, password=config.browser_config.proxy.password, ) browser_attrs = [ "headless", "disable_security", "extra_chromium_args", "chrome_instance_path", "wss_url", "cdp_url", ] for attr in browser_attrs: value = getattr(config.browser_config, attr, None) if value is not None: if not isinstance(value, list) or value: browser_config_kwargs[attr] = value self.browser = BrowserUseBrowser(BrowserConfig(**browser_config_kwargs)) if self.context is None: context_config = BrowserContextConfig() # if there is context config in the config, use it. if ( config.browser_config and hasattr(config.browser_config, "new_context_config") and config.browser_config.new_context_config ): context_config = config.browser_config.new_context_config self.context = await self.browser.new_context(context_config) self.dom_service = DomService(await self.context.get_current_page()) return self.context async def execute( self, action: str, url: Optional[str] = None, index: Optional[int] = None, text: Optional[str] = None, script: Optional[str] = None, scroll_amount: Optional[int] = None, tab_id: Optional[int] = None, start_index: Optional[int] = None, end_index: Optional[int] = None, query: Optional[str] = None, goal: Optional[str] = None, success: Optional[bool] = None, keys: Optional[str] = None, seconds: Optional[int] = None, **kwargs, ) -> ToolResult: """ Execute a specified browser action. Args: action: The browser action to perform url: URL for navigation or new tab index: Element index for click or input actions text: Text for input action or search query script: JavaScript code for execution scroll_amount: Pixels to scroll for scroll action tab_id: Tab ID for switch_tab action start_index: Starting character index for text observation end_index: Ending character index for text observation query: Search query for Google search goal: Extraction goal for content extraction success: Success status for done action keys: Keys to send for keyboard actions seconds: Seconds to wait **kwargs: Additional arguments Returns: ToolResult with the action's output or error """ async with self.lock: try: context = await self._ensure_browser_initialized() # Get max content length from config max_content_length = getattr( config.browser_config, "max_content_length", 2000 ) # Navigation actions if action == "navigate": if not url: return ToolResult(error="URL is required for 'navigate' action") await context.navigate_to(url) return ToolResult(output=f"Navigated to {url}") elif action == "go_back": await context.go_back() return ToolResult(output="Navigated back") elif action == "refresh": await context.refresh_page() return ToolResult(output="Refreshed current page") elif action == "web_search": if not query: return ToolResult( error="Query is required for 'web_search' action" ) search_results = await self.web_search_tool.execute(query) if search_results: # Navigate to the first search result first_result = search_results[0] await context.navigate_to(first_result) return ToolResult( output=f"Searched for '{query}' and navigated to first result: {first_result}\nAll results:" + "\n".join(search_results) ) else: return ToolResult( error=f"No search results found for '{query}'" ) # Element interaction actions elif action == "click_element": if index is None: return ToolResult( error="Index is required for 'click_element' action" ) element = await context.get_dom_element_by_index(index) if not element: return ToolResult(error=f"Element with index {index} not found") download_path = await context._click_element_node(element) output = f"Clicked element at index {index}" if download_path: output += f" - Downloaded file to {download_path}" return ToolResult(output=output) elif action == "input_text": if index is None or not text: return ToolResult( error="Index and text are required for 'input_text' action" ) element = await context.get_dom_element_by_index(index) if not element: return ToolResult(error=f"Element with index {index} not found") await context._input_text_element_node(element, text) return ToolResult( output=f"Input '{text}' into element at index {index}" ) elif action == "scroll_down" or action == "scroll_up": direction = 1 if action == "scroll_down" else -1 amount = ( scroll_amount if scroll_amount is not None else context.config.browser_window_size["height"] ) await context.execute_javascript( f"window.scrollBy(0, {direction * amount});" ) return ToolResult( output=f"Scrolled {'down' if direction > 0 else 'up'} by {amount} pixels" ) elif action == "scroll_to_text": if not text: return ToolResult( error="Text is required for 'scroll_to_text' action" ) page = await context.get_current_page() try: locator = page.get_by_text(text, exact=False) await locator.scroll_into_view_if_needed() return ToolResult(output=f"Scrolled to text: '{text}'") except Exception as e: return ToolResult(error=f"Failed to scroll to text: {str(e)}") elif action == "send_keys": if not keys: return ToolResult( error="Keys are required for 'send_keys' action" ) page = await context.get_current_page() await page.keyboard.press(keys) return ToolResult(output=f"Sent keys: {keys}") elif action == "get_dropdown_options": if index is None: return ToolResult( error="Index is required for 'get_dropdown_options' action" ) element = await context.get_dom_element_by_index(index) if not element: return ToolResult(error=f"Element with index {index} not found") page = await context.get_current_page() options = await page.evaluate( """ (xpath) => { const select = document.evaluate(xpath, document, null, XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue; if (!select) return null; return Array.from(select.options).map(opt => ({ text: opt.text, value: opt.value, index: opt.index })); } """, element.xpath, ) return ToolResult(output=f"Dropdown options: {options}") elif action == "select_dropdown_option": if index is None or not text: return ToolResult( error="Index and text are required for 'select_dropdown_option' action" ) element = await context.get_dom_element_by_index(index) if not element: return ToolResult(error=f"Element with index {index} not found") page = await context.get_current_page() await page.select_option(element.xpath, label=text) return ToolResult( output=f"Selected option '{text}' from dropdown at index {index}" ) # Content extraction actions elif action == "get_current_state": return await self.get_current_state(context) elif action == "get_html": html = await context.get_page_html() truncated = ( html[:max_content_length] + "..." if len(html) > max_content_length else html ) return ToolResult(output=truncated) elif action == "get_text": start = start_index if start_index is not None else 0 end = end_index if end_index is not None else max_content_length text = await context.execute_javascript( f"document.body.innerText.substring({start}, {end})" ) full_length = await context.execute_javascript( "document.body.innerText.length" ) result = f"Text from index {start} to {end}:\n{text}" if end < full_length: result += f"\n\n[Text continues... {full_length - end} more characters available]" if start > 0: result += f"\n[{start} characters before this point]" return ToolResult(output=result) elif action == "read_links": links = await context.execute_javascript( "Array.from(document.querySelectorAll('a[href]')).map(elem => elem.innerText && elem.href ? `${elem.innerText.trim()} - ${elem.href}` : null).filter(Boolean).join('\\n')" ) return ToolResult(output=links) elif action == "extract_content": if not goal: return ToolResult( error="Goal is required for 'extract_content' action" ) await context.get_page_html() # Note: In a real implementation, this would use an LLM to extract content return ToolResult(output=f"Extracted content for goal: {goal}") elif action == "screenshot": screenshot = await context.take_screenshot(full_page=True) return ToolResult( output=f"Screenshot captured (base64 length: {len(screenshot)})", system=screenshot, ) # Tab management actions elif action == "switch_tab": if tab_id is None: return ToolResult( error="Tab ID is required for 'switch_tab' action" ) await context.switch_to_tab(tab_id) return ToolResult(output=f"Switched to tab {tab_id}") elif action == "open_tab": if not url: return ToolResult(error="URL is required for 'open_tab' action") await context.create_new_tab(url) return ToolResult(output=f"Opened new tab with URL {url}") elif action == "close_tab": await context.close_current_tab() return ToolResult(output="Closed current tab") # Utility actions elif action == "wait": seconds_to_wait = seconds if seconds is not None else 3 await asyncio.sleep(seconds_to_wait) return ToolResult(output=f"Waited for {seconds_to_wait} seconds") elif action == "execute_js": if not script: return ToolResult( error="Script is required for 'execute_js' action" ) result = await context.execute_javascript(script) return ToolResult(output=str(result)) # Task completion elif action == "done": if not text: return ToolResult(error="Text is required for 'done' action") success_value = success if success is not None else True return ToolResult(output=text, is_done=True, success=success_value) else: return ToolResult(error=f"Unknown action: {action}") except Exception as e: return ToolResult(error=f"Browser action '{action}' failed: {str(e)}") async def get_current_state(self, context: BrowserContext) -> ToolResult: """Get the current browser state as a ToolResult.""" try: state = await context.get_state() state_info = { "url": state.url, "title": state.title, "tabs": [tab.model_dump() for tab in state.tabs], "help": "[0], [1], [2], etc., represent clickable indices corresponding to the elements listed. Clicking on these indices will navigate to or interact with the respective content behind them.", "interactive_elements": state.element_tree.clickable_elements_to_string(), "scroll_info": { "pixels_above": state.pixels_above, "pixels_below": state.pixels_below, "total_height": state.pixels_above + state.pixels_below + (state.viewport_info.height if state.viewport_info else 0), }, } return ToolResult( output=json.dumps(state_info, indent=4, ensure_ascii=False) ) except Exception as e: return ToolResult(error=f"Failed to get browser state: {str(e)}") async def cleanup(self): """Clean up browser resources.""" async with self.lock: if self.context is not None: await self.context.close() self.context = None self.dom_service = None if self.browser is not None: await self.browser.close() self.browser = None def __del__(self): """Ensure cleanup when object is destroyed.""" if self.browser is not None or self.context is not None: try: asyncio.run(self.cleanup()) except RuntimeError: loop = asyncio.new_event_loop() loop.run_until_complete(self.cleanup()) loop.close() @classmethod def create_with_context(cls, context: Context) -> "BrowserUseTool[Context]": """Factory method to create a BrowserUseTool with a specific context.""" tool = cls() tool.tool_context = context return tool