130 lines
4.6 KiB
Python
130 lines
4.6 KiB
Python
import json
|
|
from typing import Any, Optional
|
|
|
|
from pydantic import Field
|
|
|
|
from app.agent.toolcall import ToolCallAgent
|
|
from app.logger import logger
|
|
from app.prompt.browser import NEXT_STEP_PROMPT, SYSTEM_PROMPT
|
|
from app.schema import Message, ToolChoice
|
|
from app.tool import BrowserUseTool, Terminate, ToolCollection
|
|
|
|
|
|
class BrowserAgent(ToolCallAgent):
|
|
"""
|
|
A browser agent that uses the browser_use library to control a browser.
|
|
|
|
This agent can navigate web pages, interact with elements, fill forms,
|
|
extract content, and perform other browser-based actions to accomplish tasks.
|
|
"""
|
|
|
|
name: str = "browser"
|
|
description: str = "A browser agent that can control a browser to accomplish tasks"
|
|
|
|
system_prompt: str = SYSTEM_PROMPT
|
|
next_step_prompt: str = NEXT_STEP_PROMPT
|
|
|
|
max_observe: int = 10000
|
|
max_steps: int = 20
|
|
|
|
# Configure the available tools
|
|
available_tools: ToolCollection = Field(
|
|
default_factory=lambda: ToolCollection(BrowserUseTool(), Terminate())
|
|
)
|
|
|
|
# Use Auto for tool choice to allow both tool usage and free-form responses
|
|
tool_choices: ToolChoice = ToolChoice.AUTO
|
|
special_tool_names: list[str] = Field(default_factory=lambda: [Terminate().name])
|
|
|
|
_current_base64_image: Optional[str] = None
|
|
|
|
async def _handle_special_tool(self, name: str, result: Any, **kwargs):
|
|
if not self._is_special_tool(name):
|
|
return
|
|
else:
|
|
await self.available_tools.get_tool(BrowserUseTool().name).cleanup()
|
|
await super()._handle_special_tool(name, result, **kwargs)
|
|
|
|
async def get_browser_state(self) -> Optional[dict]:
|
|
"""Get the current browser state for context in next steps."""
|
|
browser_tool = self.available_tools.get_tool(BrowserUseTool().name)
|
|
if not browser_tool:
|
|
return None
|
|
|
|
try:
|
|
# Get browser state directly from the tool
|
|
result = await browser_tool.get_current_state()
|
|
|
|
if result.error:
|
|
logger.debug(f"Browser state error: {result.error}")
|
|
return None
|
|
|
|
# Store screenshot if available
|
|
if hasattr(result, "base64_image") and result.base64_image:
|
|
self._current_base64_image = result.base64_image
|
|
|
|
# Parse the state info
|
|
return json.loads(result.output)
|
|
|
|
except Exception as e:
|
|
logger.debug(f"Failed to get browser state: {str(e)}")
|
|
return None
|
|
|
|
async def think(self) -> bool:
|
|
"""Process current state and decide next actions using tools, with browser state info added"""
|
|
# Add browser state to the context
|
|
browser_state = await self.get_browser_state()
|
|
|
|
# Initialize placeholder values
|
|
url_info = ""
|
|
tabs_info = ""
|
|
content_above_info = ""
|
|
content_below_info = ""
|
|
results_info = ""
|
|
|
|
if browser_state and not browser_state.get("error"):
|
|
# URL and title info
|
|
url_info = f"\n URL: {browser_state.get('url', 'N/A')}\n Title: {browser_state.get('title', 'N/A')}"
|
|
|
|
# Tab information
|
|
if "tabs" in browser_state:
|
|
tabs = browser_state.get("tabs", [])
|
|
if tabs:
|
|
tabs_info = f"\n {len(tabs)} tab(s) available"
|
|
|
|
# Content above/below viewport
|
|
pixels_above = browser_state.get("pixels_above", 0)
|
|
pixels_below = browser_state.get("pixels_below", 0)
|
|
|
|
if pixels_above > 0:
|
|
content_above_info = f" ({pixels_above} pixels)"
|
|
|
|
if pixels_below > 0:
|
|
content_below_info = f" ({pixels_below} pixels)"
|
|
|
|
# Add screenshot as base64 if available
|
|
if self._current_base64_image:
|
|
# Create a message with image attachment
|
|
image_message = Message.user_message(
|
|
content="Current browser screenshot:",
|
|
base64_image=self._current_base64_image,
|
|
)
|
|
self.memory.add_message(image_message)
|
|
|
|
# Replace placeholders with actual browser state info
|
|
self.next_step_prompt = NEXT_STEP_PROMPT.format(
|
|
url_placeholder=url_info,
|
|
tabs_placeholder=tabs_info,
|
|
content_above_placeholder=content_above_info,
|
|
content_below_placeholder=content_below_info,
|
|
results_placeholder=results_info,
|
|
)
|
|
|
|
# Call parent implementation
|
|
result = await super().think()
|
|
|
|
# Reset the next_step_prompt to its original state
|
|
self.next_step_prompt = NEXT_STEP_PROMPT
|
|
|
|
return result
|