import asyncio import json from typing import Generic, Optional, TypeVar from browser_use import Browser as BrowserUseBrowser from browser_use import BrowserConfig from browser_use.browser.context import BrowserContext, BrowserContextConfig from browser_use.dom.service import DomService from pydantic import Field, field_validator from pydantic_core.core_schema import ValidationInfo from app.config import config from app.llm import LLM from app.tool.base import BaseTool, ToolResult from app.tool.web_search import WebSearch _BROWSER_DESCRIPTION = """ Interact with a web browser to perform various actions such as navigation, element interaction, content extraction, and tab management. This tool provides a comprehensive set of browser automation capabilities: Navigation: - 'go_to_url': Go to a specific URL in the current tab - 'go_back': Go back - 'refresh': Refresh the current page - 'web_search': Search the query in the current tab, the query should be a search query like humans search in web, concrete and not vague or super long. More the single most important items. Element Interaction: - 'click_element': Click an element by index - 'input_text': Input text into a form element - 'scroll_down'/'scroll_up': Scroll the page (with optional pixel amount) - 'scroll_to_text': If you dont find something which you want to interact with, scroll to it - 'send_keys': Send strings of special keys like Escape,Backspace, Insert, PageDown, Delete, Enter, Shortcuts such as `Control+o`, `Control+Shift+T` are supported as well. This gets used in keyboard.press. - 'get_dropdown_options': Get all options from a dropdown - 'select_dropdown_option': Select dropdown option for interactive element index by the text of the option you want to select Content Extraction: - 'extract_content': Extract page content to retrieve specific information from the page, e.g. all company names, a specifc description, all information about, links with companies in structured format or simply links Tab Management: - 'switch_tab': Switch to a specific tab - 'open_tab': Open a new tab with a URL - 'close_tab': Close the current tab Utility: - 'wait': Wait for a specified number of seconds """ Context = TypeVar("Context") class BrowserUseTool(BaseTool, Generic[Context]): name: str = "browser_use" description: str = _BROWSER_DESCRIPTION parameters: dict = { "type": "object", "properties": { "action": { "type": "string", "enum": [ "go_to_url", "click_element", "input_text", "scroll_down", "scroll_up", "scroll_to_text", "send_keys", "get_dropdown_options", "select_dropdown_option", "go_back", "web_search", "wait", "extract_content", "switch_tab", "open_tab", "close_tab", ], "description": "The browser action to perform", }, "url": { "type": "string", "description": "URL for 'go_to_url' or 'open_tab' actions", }, "index": { "type": "integer", "description": "Element index for 'click_element', 'input_text', 'get_dropdown_options', or 'select_dropdown_option' actions", }, "text": { "type": "string", "description": "Text for 'input_text', 'scroll_to_text', or 'select_dropdown_option' actions", }, "scroll_amount": { "type": "integer", "description": "Pixels to scroll (positive for down, negative for up) for 'scroll_down' or 'scroll_up' actions", }, "tab_id": { "type": "integer", "description": "Tab ID for 'switch_tab' action", }, "query": { "type": "string", "description": "Search query for 'web_search' action", }, "goal": { "type": "string", "description": "Extraction goal for 'extract_content' action", }, "keys": { "type": "string", "description": "Keys to send for 'send_keys' action", }, "seconds": { "type": "integer", "description": "Seconds to wait for 'wait' action", }, }, "required": ["action"], "dependencies": { "go_to_url": ["url"], "click_element": ["index"], "input_text": ["index", "text"], "switch_tab": ["tab_id"], "open_tab": ["url"], "scroll_down": ["scroll_amount"], "scroll_up": ["scroll_amount"], "scroll_to_text": ["text"], "send_keys": ["keys"], "get_dropdown_options": ["index"], "select_dropdown_option": ["index", "text"], "go_back": [], "web_search": ["query"], "wait": ["seconds"], "extract_content": ["goal"], }, } lock: asyncio.Lock = Field(default_factory=asyncio.Lock) browser: Optional[BrowserUseBrowser] = Field(default=None, exclude=True) context: Optional[BrowserContext] = Field(default=None, exclude=True) dom_service: Optional[DomService] = Field(default=None, exclude=True) web_search_tool: WebSearch = Field(default_factory=WebSearch, exclude=True) # Context for generic functionality tool_context: Optional[Context] = Field(default=None, exclude=True) llm: Optional[LLM] = Field(default_factory=LLM) @field_validator("parameters", mode="before") def validate_parameters(cls, v: dict, info: ValidationInfo) -> dict: if not v: raise ValueError("Parameters cannot be empty") return v async def _ensure_browser_initialized(self) -> BrowserContext: """Ensure browser and context are initialized.""" if self.browser is None: browser_config_kwargs = {"headless": False, "disable_security": True} if config.browser_config: from browser_use.browser.browser import ProxySettings # handle proxy settings. if config.browser_config.proxy and config.browser_config.proxy.server: browser_config_kwargs["proxy"] = ProxySettings( server=config.browser_config.proxy.server, username=config.browser_config.proxy.username, password=config.browser_config.proxy.password, ) browser_attrs = [ "headless", "disable_security", "extra_chromium_args", "chrome_instance_path", "wss_url", "cdp_url", ] for attr in browser_attrs: value = getattr(config.browser_config, attr, None) if value is not None: if not isinstance(value, list) or value: browser_config_kwargs[attr] = value self.browser = BrowserUseBrowser(BrowserConfig(**browser_config_kwargs)) if self.context is None: context_config = BrowserContextConfig() # if there is context config in the config, use it. if ( config.browser_config and hasattr(config.browser_config, "new_context_config") and config.browser_config.new_context_config ): context_config = config.browser_config.new_context_config self.context = await self.browser.new_context(context_config) self.dom_service = DomService(await self.context.get_current_page()) return self.context async def execute( self, action: str, url: Optional[str] = None, index: Optional[int] = None, text: Optional[str] = None, scroll_amount: Optional[int] = None, tab_id: Optional[int] = None, query: Optional[str] = None, goal: Optional[str] = None, keys: Optional[str] = None, seconds: Optional[int] = None, **kwargs, ) -> ToolResult: """ Execute a specified browser action. Args: action: The browser action to perform url: URL for navigation or new tab index: Element index for click or input actions text: Text for input action or search query scroll_amount: Pixels to scroll for scroll action tab_id: Tab ID for switch_tab action query: Search query for Google search goal: Extraction goal for content extraction keys: Keys to send for keyboard actions seconds: Seconds to wait **kwargs: Additional arguments Returns: ToolResult with the action's output or error """ async with self.lock: try: context = await self._ensure_browser_initialized() # Get max content length from config max_content_length = getattr( config.browser_config, "max_content_length", 2000 ) # Navigation actions if action == "go_to_url": if not url: return ToolResult( error="URL is required for 'go_to_url' action" ) page = await context.get_current_page() await page.goto(url) await page.wait_for_load_state() return ToolResult(output=f"Navigated to {url}") elif action == "go_back": await context.go_back() return ToolResult(output="Navigated back") elif action == "refresh": await context.refresh_page() return ToolResult(output="Refreshed current page") elif action == "web_search": if not query: return ToolResult( error="Query is required for 'web_search' action" ) search_results = await self.web_search_tool.execute(query) if search_results: # Navigate to the first search result first_result = search_results[0] if isinstance(first_result, dict) and "url" in first_result: url_to_navigate = first_result["url"] elif isinstance(first_result, str): url_to_navigate = first_result else: return ToolResult( error=f"Invalid search result format: {first_result}" ) page = await context.get_current_page() await page.goto(url_to_navigate) await page.wait_for_load_state() return ToolResult( output=f"Searched for '{query}' and navigated to first result: {url_to_navigate}\nAll results:" + "\n".join([str(r) for r in search_results]) ) else: return ToolResult( error=f"No search results found for '{query}'" ) # Element interaction actions elif action == "click_element": if index is None: return ToolResult( error="Index is required for 'click_element' action" ) element = await context.get_dom_element_by_index(index) if not element: return ToolResult(error=f"Element with index {index} not found") download_path = await context._click_element_node(element) output = f"Clicked element at index {index}" if download_path: output += f" - Downloaded file to {download_path}" return ToolResult(output=output) elif action == "input_text": if index is None or not text: return ToolResult( error="Index and text are required for 'input_text' action" ) element = await context.get_dom_element_by_index(index) if not element: return ToolResult(error=f"Element with index {index} not found") await context._input_text_element_node(element, text) return ToolResult( output=f"Input '{text}' into element at index {index}" ) elif action == "scroll_down" or action == "scroll_up": direction = 1 if action == "scroll_down" else -1 amount = ( scroll_amount if scroll_amount is not None else context.config.browser_window_size["height"] ) await context.execute_javascript( f"window.scrollBy(0, {direction * amount});" ) return ToolResult( output=f"Scrolled {'down' if direction > 0 else 'up'} by {amount} pixels" ) elif action == "scroll_to_text": if not text: return ToolResult( error="Text is required for 'scroll_to_text' action" ) page = await context.get_current_page() try: locator = page.get_by_text(text, exact=False) await locator.scroll_into_view_if_needed() return ToolResult(output=f"Scrolled to text: '{text}'") except Exception as e: return ToolResult(error=f"Failed to scroll to text: {str(e)}") elif action == "send_keys": if not keys: return ToolResult( error="Keys are required for 'send_keys' action" ) page = await context.get_current_page() await page.keyboard.press(keys) return ToolResult(output=f"Sent keys: {keys}") elif action == "get_dropdown_options": if index is None: return ToolResult( error="Index is required for 'get_dropdown_options' action" ) element = await context.get_dom_element_by_index(index) if not element: return ToolResult(error=f"Element with index {index} not found") page = await context.get_current_page() options = await page.evaluate( """ (xpath) => { const select = document.evaluate(xpath, document, null, XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue; if (!select) return null; return Array.from(select.options).map(opt => ({ text: opt.text, value: opt.value, index: opt.index })); } """, element.xpath, ) return ToolResult(output=f"Dropdown options: {options}") elif action == "select_dropdown_option": if index is None or not text: return ToolResult( error="Index and text are required for 'select_dropdown_option' action" ) element = await context.get_dom_element_by_index(index) if not element: return ToolResult(error=f"Element with index {index} not found") page = await context.get_current_page() await page.select_option(element.xpath, label=text) return ToolResult( output=f"Selected option '{text}' from dropdown at index {index}" ) # Content extraction actions elif action == "extract_content": if not goal: return ToolResult( error="Goal is required for 'extract_content' action" ) page = await context.get_current_page() try: # Get page content and convert to markdown for better processing html_content = await page.content() # Import markdownify here to avoid global import try: import markdownify content = markdownify.markdownify(html_content) except ImportError: # Fallback if markdownify is not available content = html_content # Create prompt for LLM prompt_text = """ Your task is to extract the content of the page. You will be given a page and a goal, and you should extract all relevant information around this goal from the page. If the goal is vague, summarize the page. Respond in json format. Extraction goal: {goal} Page content: {page} """ # Format the prompt with the goal and content max_content_length = min(50000, len(content)) formatted_prompt = prompt_text.format( goal=goal, page=content[:max_content_length] ) # Create a proper message list for the LLM from app.schema import Message messages = [Message.user_message(formatted_prompt)] # Define extraction function for the tool extraction_function = { "type": "function", "function": { "name": "extract_content", "description": "Extract specific information from a webpage based on a goal", "parameters": { "type": "object", "properties": { "extracted_content": { "type": "object", "description": "The content extracted from the page according to the goal", } }, "required": ["extracted_content"], }, }, } # Use LLM to extract content with required function calling response = await self.llm.ask_tool( messages, tools=[extraction_function], tool_choice="required", ) # Extract content from function call response if ( response and response.tool_calls and len(response.tool_calls) > 0 ): # Get the first tool call arguments tool_call = response.tool_calls[0] # Parse the JSON arguments try: args = json.loads(tool_call.function.arguments) extracted_content = args.get("extracted_content", {}) # Format extracted content as JSON string content_json = json.dumps( extracted_content, indent=2, ensure_ascii=False ) msg = f"Extracted from page:\n{content_json}\n" except Exception as e: msg = f"Error parsing extraction result: {str(e)}\nRaw response: {tool_call.function.arguments}" else: msg = "No content was extracted from the page." return ToolResult(output=msg) except Exception as e: # Provide a more helpful error message error_msg = f"Failed to extract content: {str(e)}" try: # Try to return a portion of the page content as fallback return ToolResult( output=f"{error_msg}\nHere's a portion of the page content:\n{content[:2000]}..." ) except: # If all else fails, just return the error return ToolResult(error=error_msg) # Tab management actions elif action == "switch_tab": if tab_id is None: return ToolResult( error="Tab ID is required for 'switch_tab' action" ) await context.switch_to_tab(tab_id) page = await context.get_current_page() await page.wait_for_load_state() return ToolResult(output=f"Switched to tab {tab_id}") elif action == "open_tab": if not url: return ToolResult(error="URL is required for 'open_tab' action") await context.create_new_tab(url) return ToolResult(output=f"Opened new tab with {url}") elif action == "close_tab": await context.close_current_tab() return ToolResult(output="Closed current tab") # Utility actions elif action == "wait": seconds_to_wait = seconds if seconds is not None else 3 await asyncio.sleep(seconds_to_wait) return ToolResult(output=f"Waited for {seconds_to_wait} seconds") else: return ToolResult(error=f"Unknown action: {action}") except Exception as e: return ToolResult(error=f"Browser action '{action}' failed: {str(e)}") async def get_current_state( self, context: Optional[BrowserContext] = None ) -> ToolResult: """ Get the current browser state as a ToolResult. If context is not provided, uses self.context. """ try: # Use provided context or fall back to self.context ctx = context or self.context if not ctx: return ToolResult(error="Browser context not initialized") state = await ctx.get_state() # Create a viewport_info dictionary if it doesn't exist viewport_height = 0 if hasattr(state, "viewport_info") and state.viewport_info: viewport_height = state.viewport_info.height elif hasattr(ctx, "config") and hasattr(ctx.config, "browser_window_size"): viewport_height = ctx.config.browser_window_size.get("height", 0) # Take a screenshot for the state screenshot = await ctx.take_screenshot(full_page=True) # Build the state info with all required fields state_info = { "url": state.url, "title": state.title, "tabs": [tab.model_dump() for tab in state.tabs], "help": "[0], [1], [2], etc., represent clickable indices corresponding to the elements listed. Clicking on these indices will navigate to or interact with the respective content behind them.", "interactive_elements": ( state.element_tree.clickable_elements_to_string() if state.element_tree else "" ), "scroll_info": { "pixels_above": getattr(state, "pixels_above", 0), "pixels_below": getattr(state, "pixels_below", 0), "total_height": getattr(state, "pixels_above", 0) + getattr(state, "pixels_below", 0) + viewport_height, }, "viewport_height": viewport_height, } return ToolResult( output=json.dumps(state_info, indent=4, ensure_ascii=False), base64_image=screenshot, ) except Exception as e: return ToolResult(error=f"Failed to get browser state: {str(e)}") async def cleanup(self): """Clean up browser resources.""" async with self.lock: if self.context is not None: await self.context.close() self.context = None self.dom_service = None if self.browser is not None: await self.browser.close() self.browser = None def __del__(self): """Ensure cleanup when object is destroyed.""" if self.browser is not None or self.context is not None: try: asyncio.run(self.cleanup()) except RuntimeError: loop = asyncio.new_event_loop() loop.run_until_complete(self.cleanup()) loop.close() @classmethod def create_with_context(cls, context: Context) -> "BrowserUseTool[Context]": """Factory method to create a BrowserUseTool with a specific context.""" tool = cls() tool.tool_context = context return tool