From 64786fd3b5ecf6c3140d9dff92e04f6a60dce9ee Mon Sep 17 00:00:00 2001 From: liangxinbing <1580466765@qq.com> Date: Fri, 7 Mar 2025 03:21:10 +0800 Subject: [PATCH] update toolcall, planning use and README.md --- README.md | 6 + app/agent/manus.py | 2 +- app/agent/toolcall.py | 27 +++- app/agent/toolcall_en.py | 182 ----------------------- app/agent/toolcall_zh.py | 178 ----------------------- app/flow/base.py | 42 +++--- app/flow/planning.py | 307 +++++++++++++++++++-------------------- app/tool/planning.py | 33 ++--- 8 files changed, 216 insertions(+), 561 deletions(-) delete mode 100644 app/agent/toolcall_en.py delete mode 100644 app/agent/toolcall_zh.py diff --git a/README.md b/README.md index c18764f..41cf1dd 100644 --- a/README.md +++ b/README.md @@ -70,6 +70,12 @@ python main.py Then input your idea via terminal! +For unstable version, you also can run: + +```bash +python run_flow.py +``` + ## How to contribute We welcome any friendly suggestions and helpful contributions! Just create issues or submit pull requests. diff --git a/app/agent/manus.py b/app/agent/manus.py index 74365a2..7fa4b69 100644 --- a/app/agent/manus.py +++ b/app/agent/manus.py @@ -1,6 +1,6 @@ from pydantic import Field -from app.agent.toolcall_en import ToolCallAgent +from app.agent.toolcall import ToolCallAgent from app.prompt.manus import NEXT_STEP_PROMPT, SYSTEM_PROMPT from app.tool import Terminate, ToolCollection from app.tool.browser_use_tool import BrowserUseTool diff --git a/app/agent/toolcall.py b/app/agent/toolcall.py index e80df79..374a6eb 100644 --- a/app/agent/toolcall.py +++ b/app/agent/toolcall.py @@ -50,17 +50,22 @@ class ToolCallAgent(ReActAgent): self.tool_calls = response.tool_calls # Log response info - logger.info(f"Tool content: {response.content}") + logger.info(f"✨ {self.name}'s thoughts: {response.content}") logger.info( - f"Tool calls count: {len(response.tool_calls) if response.tool_calls else 0}" + f"🛠️ {self.name} selected {len(response.tool_calls) if response.tool_calls else 0} tools to use" ) - logger.info(f"Tool calls: {response.tool_calls}") + if response.tool_calls: + logger.info( + f"🧰 Tools being prepared: {[call.function.name for call in response.tool_calls]}" + ) try: # Handle different tool_choices modes if self.tool_choices == "none": if response.tool_calls: - logger.warning("Tool calls provided when tool_choice is 'none'") + logger.warning( + f"🤔 Hmm, {self.name} tried to use tools when they weren't available!" + ) if response.content: self.memory.add_message(Message.assistant_message(response.content)) return True @@ -85,7 +90,7 @@ class ToolCallAgent(ReActAgent): return bool(self.tool_calls) except Exception as e: - logger.error(f"Error in thinking phase: {e}") + logger.error(f"🚨 Oops! The {self.name}'s thinking process hit a snag: {e}") self.memory.add_message( Message.assistant_message( f"Error encountered while processing: {str(e)}" @@ -105,7 +110,9 @@ class ToolCallAgent(ReActAgent): results = [] for command in self.tool_calls: result = await self.execute_tool(command) - logger.info(f"Executed tool {command.function.name} with result: {result}") + logger.info( + f"🎯 Tool '{command.function.name}' completed its mission! Result: {result}" + ) # Add tool response to memory tool_msg = Message.tool_message( @@ -130,6 +137,7 @@ class ToolCallAgent(ReActAgent): args = json.loads(command.function.arguments or "{}") # Execute the tool + logger.info(f"🔧 Activating tool: '{name}'...") result = await self.available_tools.execute(name=name, tool_input=args) # Format result for display @@ -145,10 +153,12 @@ class ToolCallAgent(ReActAgent): return observation except json.JSONDecodeError: error_msg = f"Error parsing arguments for {name}: Invalid JSON format" - logger.error(error_msg) + logger.error( + f"📝 Oops! The arguments for '{name}' don't make sense - invalid JSON" + ) return f"Error: {error_msg}" except Exception as e: - error_msg = f"Error executing tool {name}: {str(e)}" + error_msg = f"⚠️ Tool '{name}' encountered a problem: {str(e)}" logger.error(error_msg) return f"Error: {error_msg}" @@ -159,6 +169,7 @@ class ToolCallAgent(ReActAgent): if self._should_finish_execution(name=name, result=result, **kwargs): # Set agent state to finished + logger.info(f"🏁 Special tool '{name}' has completed the task!") self.state = AgentState.FINISHED @staticmethod diff --git a/app/agent/toolcall_en.py b/app/agent/toolcall_en.py deleted file mode 100644 index 0b821d2..0000000 --- a/app/agent/toolcall_en.py +++ /dev/null @@ -1,182 +0,0 @@ -import json -from typing import Any, List, Literal - -from pydantic import Field - -from app.agent.react import ReActAgent -from app.logger import logger -from app.prompt.toolcall import NEXT_STEP_PROMPT, SYSTEM_PROMPT -from app.schema import AgentState, Message, ToolCall -from app.tool import CreateChatCompletion, Terminate, ToolCollection - - -TOOL_CALL_REQUIRED = "Tool calls required but none provided" - - -class ToolCallAgent(ReActAgent): - """Base agent class for handling tool/function calls with enhanced abstraction""" - - name: str = "toolcall" - description: str = "an agent that can execute tool calls." - - system_prompt: str = SYSTEM_PROMPT - next_step_prompt: str = NEXT_STEP_PROMPT - - available_tools: ToolCollection = ToolCollection( - CreateChatCompletion(), Terminate() - ) - tool_choices: Literal["none", "auto", "required"] = "auto" - special_tool_names: List[str] = Field(default_factory=lambda: [Terminate().name]) - - tool_calls: List[ToolCall] = Field(default_factory=list) - - max_steps: int = 30 - - async def think(self) -> bool: - """Process current state and decide next actions using tools""" - if self.next_step_prompt: - user_msg = Message.user_message(self.next_step_prompt) - self.messages += [user_msg] - - # Get response with tool options - response = await self.llm.ask_tool( - messages=self.messages, - system_msgs=[Message.system_message(self.system_prompt)] - if self.system_prompt - else None, - tools=self.available_tools.to_params(), - tool_choice=self.tool_choices, - ) - self.tool_calls = response.tool_calls - - # Log response info in a more engaging way - logger.info(f"✨ AI's thoughts: {response.content}") - logger.info( - f"🛠️ AI selected {len(response.tool_calls) if response.tool_calls else 0} tools to use" - ) - if response.tool_calls: - logger.info( - f"🧰 Tools being prepared: {[call.function.name for call in response.tool_calls]}" - ) - - try: - # Handle different tool_choices modes - if self.tool_choices == "none": - if response.tool_calls: - logger.warning( - "🤔 Hmm, AI tried to use tools when they weren't available!" - ) - if response.content: - self.memory.add_message(Message.assistant_message(response.content)) - return True - return False - - # Create and add assistant message - assistant_msg = ( - Message.from_tool_calls( - content=response.content, tool_calls=self.tool_calls - ) - if self.tool_calls - else Message.assistant_message(response.content) - ) - self.memory.add_message(assistant_msg) - - if self.tool_choices == "required" and not self.tool_calls: - return True # Will be handled in act() - - # For 'auto' mode, continue with content if no commands but content exists - if self.tool_choices == "auto" and not self.tool_calls: - return bool(response.content) - - return bool(self.tool_calls) - except Exception as e: - logger.error(f"🚨 Oops! The AI's thinking process hit a snag: {e}") - self.memory.add_message( - Message.assistant_message( - f"Error encountered while processing: {str(e)}" - ) - ) - return False - - async def act(self) -> str: - """Execute tool calls and handle their results""" - if not self.tool_calls: - if self.tool_choices == "required": - raise ValueError(TOOL_CALL_REQUIRED) - - # Return last message content if no tool calls - return self.messages[-1].content or "No content or commands to execute" - - results = [] - for command in self.tool_calls: - result = await self.execute_tool(command) - logger.info( - f"🎯 Tool '{command.function.name}' completed its mission! Result: {result[:100]}{'...' if len(result) > 100 else ''}" - ) - - # Add tool response to memory - tool_msg = Message.tool_message( - content=result, tool_call_id=command.id, name=command.function.name - ) - self.memory.add_message(tool_msg) - results.append(result) - - return "\n\n".join(results) - - async def execute_tool(self, command: ToolCall) -> str: - """Execute a single tool call with robust error handling""" - if not command or not command.function or not command.function.name: - return "Error: Invalid command format" - - name = command.function.name - if name not in self.available_tools.tool_map: - return f"Error: Unknown tool '{name}'" - - try: - # Parse arguments - args = json.loads(command.function.arguments or "{}") - - # Execute the tool - logger.info(f"🔧 Activating tool: '{name}'...") - result = await self.available_tools.execute(name=name, tool_input=args) - - # Format result for display - observation = ( - f"Observed output of cmd `{name}` executed:\n{str(result)}" - if result - else f"Cmd `{name}` completed with no output" - ) - - # Handle special tools like `finish` - await self._handle_special_tool(name=name, result=result) - - return observation - except json.JSONDecodeError: - error_msg = f"Error parsing arguments for {name}: Invalid JSON format" - logger.error( - f"📝 Oops! The arguments for '{name}' don't make sense - invalid JSON" - ) - return f"Error: {error_msg}" - except Exception as e: - error_msg = f"Error executing tool {name}: {str(e)}" - logger.error(f"⚠️ Tool '{name}' encountered a problem: {str(e)}") - return f"Error: {error_msg}" - - async def _handle_special_tool(self, name: str, result: Any, **kwargs): - """Handle special tool execution and state changes""" - if not self._is_special_tool(name): - return - - if self._should_finish_execution(name=name, result=result, **kwargs): - # Set agent state to finished - logger.info(f"🏁 Special tool '{name}' has completed the task!") - self.state = AgentState.FINISHED - - @staticmethod - def _should_finish_execution(**kwargs) -> bool: - """Determine if tool execution should finish the agent""" - return True - - def _is_special_tool(self, name: str) -> bool: - """Check if tool name is in special tools list""" - return name.lower() in [n.lower() for n in self.special_tool_names] diff --git a/app/agent/toolcall_zh.py b/app/agent/toolcall_zh.py deleted file mode 100644 index 28d8b25..0000000 --- a/app/agent/toolcall_zh.py +++ /dev/null @@ -1,178 +0,0 @@ -import json -from typing import Any, List, Literal - -from pydantic import Field - -from app.agent.react import ReActAgent -from app.logger import logger -from app.prompt.toolcall import NEXT_STEP_PROMPT, SYSTEM_PROMPT -from app.schema import AgentState, Message, ToolCall -from app.tool import CreateChatCompletion, Terminate, ToolCollection - - -TOOL_CALL_REQUIRED = "Tool calls required but none provided" - - -class ToolCallAgent(ReActAgent): - """Base agent class for handling tool/function calls with enhanced abstraction""" - - name: str = "toolcall" - description: str = "an agent that can execute tool calls." - - system_prompt: str = SYSTEM_PROMPT - next_step_prompt: str = NEXT_STEP_PROMPT - - available_tools: ToolCollection = ToolCollection( - CreateChatCompletion(), Terminate() - ) - tool_choices: Literal["none", "auto", "required"] = "auto" - special_tool_names: List[str] = Field(default_factory=lambda: [Terminate().name]) - - tool_calls: List[ToolCall] = Field(default_factory=list) - - max_steps: int = 30 - - async def think(self) -> bool: - """Process current state and decide next actions using tools""" - if self.next_step_prompt: - user_msg = Message.user_message(self.next_step_prompt) - self.messages += [user_msg] - - # Get response with tool options - response = await self.llm.ask_tool( - messages=self.messages, - system_msgs=[Message.system_message(self.system_prompt)] - if self.system_prompt - else None, - tools=self.available_tools.to_params(), - tool_choice=self.tool_choices, - ) - self.tool_calls = response.tool_calls - - # Log response info in a more engaging way - logger.info(f"✨ AI的思考过程:{response.content}") - logger.info( - f"🛠️ AI选择了 {len(response.tool_calls) if response.tool_calls else 0} 个工具来解决问题" - ) - if response.tool_calls: - logger.info( - f"🧰 准备使用的工具箱:{[call.function.name for call in response.tool_calls]}" - ) - - try: - # Handle different tool_choices modes - if self.tool_choices == "none": - if response.tool_calls: - logger.warning("🤔 咦?AI想用工具,但现在不允许使用任何工具呢!") - if response.content: - self.memory.add_message(Message.assistant_message(response.content)) - return True - return False - - # Create and add assistant message - assistant_msg = ( - Message.from_tool_calls( - content=response.content, tool_calls=self.tool_calls - ) - if self.tool_calls - else Message.assistant_message(response.content) - ) - self.memory.add_message(assistant_msg) - - if self.tool_choices == "required" and not self.tool_calls: - return True # Will be handled in act() - - # For 'auto' mode, continue with content if no commands but content exists - if self.tool_choices == "auto" and not self.tool_calls: - return bool(response.content) - - return bool(self.tool_calls) - except Exception as e: - logger.error(f"🚨 糟糕!AI思考时遇到了一点小问题:{e}") - self.memory.add_message( - Message.assistant_message( - f"Error encountered while processing: {str(e)}" - ) - ) - return False - - async def act(self) -> str: - """Execute tool calls and handle their results""" - if not self.tool_calls: - if self.tool_choices == "required": - raise ValueError(TOOL_CALL_REQUIRED) - - # Return last message content if no tool calls - return self.messages[-1].content or "No content or commands to execute" - - results = [] - for command in self.tool_calls: - result = await self.execute_tool(command) - logger.info( - f"🎯 工具「{command.function.name}」完成任务啦!结果:{result[:100]}{'...' if len(result) > 100 else ''}" - ) - - # Add tool response to memory - tool_msg = Message.tool_message( - content=result, tool_call_id=command.id, name=command.function.name - ) - self.memory.add_message(tool_msg) - results.append(result) - - return "\n\n".join(results) - - async def execute_tool(self, command: ToolCall) -> str: - """Execute a single tool call with robust error handling""" - if not command or not command.function or not command.function.name: - return "Error: Invalid command format" - - name = command.function.name - if name not in self.available_tools.tool_map: - return f"Error: Unknown tool '{name}'" - - try: - # Parse arguments - args = json.loads(command.function.arguments or "{}") - - # Execute the tool - logger.info(f"🔧 正在启动工具:「{name}」...") - result = await self.available_tools.execute(name=name, tool_input=args) - - # Format result for display - observation = ( - f"Observed output of cmd `{name}` executed:\n{str(result)}" - if result - else f"Cmd `{name}` completed with no output" - ) - - # Handle special tools like `finish` - await self._handle_special_tool(name=name, result=result) - - return observation - except json.JSONDecodeError: - error_msg = f"Error parsing arguments for {name}: Invalid JSON format" - logger.error(f"📝 哎呀!「{name}」工具的参数格式不对 - JSON解析失败") - return f"Error: {error_msg}" - except Exception as e: - error_msg = f"Error executing tool {name}: {str(e)}" - logger.error(f"⚠️ 工具「{name}」执行时遇到了一点小麻烦:{str(e)}") - return f"Error: {error_msg}" - - async def _handle_special_tool(self, name: str, result: Any, **kwargs): - """Handle special tool execution and state changes""" - if not self._is_special_tool(name): - return - - if self._should_finish_execution(name=name, result=result, **kwargs): - # Set agent state to finished - logger.info(f"🏁 特殊工具「{name}」已完成任务,AI助手的工作告一段落!") - self.state = AgentState.FINISHED - - @staticmethod - def _should_finish_execution(**kwargs) -> bool: - """Determine if tool execution should finish the agent""" - return True - - def _is_special_tool(self, name: str) -> bool: - """Check if tool name is in special tools list""" - return name.lower() in [n.lower() for n in self.special_tool_names] diff --git a/app/flow/base.py b/app/flow/base.py index ab2885f..13b3e17 100644 --- a/app/flow/base.py +++ b/app/flow/base.py @@ -2,43 +2,47 @@ from abc import ABC, abstractmethod from enum import Enum from typing import Dict, List, Optional, Union +from pydantic import BaseModel + from app.agent.base import BaseAgent -from app.agent.toolcall import ToolCallAgent class FlowType(str, Enum): PLANNING = "planning" -class BaseFlow(ABC): +class BaseFlow(BaseModel, ABC): """Base class for execution flows supporting multiple agents""" + agents: Dict[str, BaseAgent] + tools: Optional[List] = None + primary_agent_key: Optional[str] = None + + class Config: + arbitrary_types_allowed = True + def __init__( - self, agents: Union[BaseAgent, List[BaseAgent], Dict[str, BaseAgent]], **kwargs + self, agents: Union[BaseAgent, List[BaseAgent], Dict[str, BaseAgent]], **data ): # Handle different ways of providing agents if isinstance(agents, BaseAgent): - self.agents = {"default": agents} + agents_dict = {"default": agents} elif isinstance(agents, list): - self.agents = {f"agent_{i}": agent for i, agent in enumerate(agents)} + agents_dict = {f"agent_{i}": agent for i, agent in enumerate(agents)} else: - self.agents = agents - - self.tools = kwargs.get("tools") - self.primary_agent_key = kwargs.get("primary_agent", None) + agents_dict = agents # If primary agent not specified, use first agent - if not self.primary_agent_key and self.agents: - self.primary_agent_key = next(iter(self.agents)) + primary_key = data.get("primary_agent_key") + if not primary_key and agents_dict: + primary_key = next(iter(agents_dict)) + data["primary_agent_key"] = primary_key - self._setup_agents() + # Set the agents dictionary + data["agents"] = agents_dict - def _setup_agents(self): - """Configure all agents with tools and initial setup""" - if self.tools: - for agent_key, agent in self.agents.items(): - if isinstance(agent, ToolCallAgent): - agent.available_tools = self.tools + # Initialize using BaseModel's init + super().__init__(**data) @property def primary_agent(self) -> Optional[BaseAgent]: @@ -52,8 +56,6 @@ class BaseFlow(ABC): def add_agent(self, key: str, agent: BaseAgent) -> None: """Add a new agent to the flow""" self.agents[key] = agent - if isinstance(agent, ToolCallAgent) and self.tools: - agent.available_tools = self.tools @abstractmethod async def execute(self, input_text: str) -> str: diff --git a/app/flow/planning.py b/app/flow/planning.py index 18b96fe..24fc9eb 100644 --- a/app/flow/planning.py +++ b/app/flow/planning.py @@ -1,67 +1,48 @@ +import json import time from typing import Dict, List, Optional, Union +from pydantic import Field + from app.agent.base import BaseAgent from app.flow.base import BaseFlow +from app.llm import LLM from app.logger import logger -from app.schema import AgentState -from app.tool import PlanningTool, ToolCollection +from app.schema import AgentState, Message +from app.tool import PlanningTool class PlanningFlow(BaseFlow): """A flow that manages planning and execution of tasks using agents.""" + llm: LLM = Field(default_factory=lambda: LLM()) + planning_tool: PlanningTool = Field(default_factory=PlanningTool) + executor_keys: List[str] = Field(default_factory=list) + active_plan_id: str = Field(default_factory=lambda: f"plan_{int(time.time())}") + current_step_index: Optional[int] = None + def __init__( - self, agents: Union[BaseAgent, List[BaseAgent], Dict[str, BaseAgent]], **kwargs + self, agents: Union[BaseAgent, List[BaseAgent], Dict[str, BaseAgent]], **data ): - # Initialize planning tool first - self.planning_tool = self._initialize_planning_tool(kwargs.get("tools")) + # Set executor keys before super().__init__ + if "executors" in data: + data["executor_keys"] = data.pop("executors") - # If tools were provided, ensure planning tool is included - tools = kwargs.get("tools") - if tools: - planning_tool_exists = any( - isinstance(tool, PlanningTool) for tool in tools.tools - ) - if not planning_tool_exists: - tools.add_tool(self.planning_tool) - else: - # Create a new tool collection with at least the planning tool - tools = ToolCollection(self.planning_tool) - kwargs["tools"] = tools + # Set plan ID if provided + if "plan_id" in data: + data["active_plan_id"] = data.pop("plan_id") - super().__init__(agents, **kwargs) + # Initialize the planning tool if not provided + if "planning_tool" not in data: + planning_tool = PlanningTool() + data["planning_tool"] = planning_tool - # Define agent roles - self.planner_key = kwargs.get("planner", self.primary_agent_key) - self.executor_keys = kwargs.get("executors", list(self.agents.keys())) + # Call parent's init with the processed data + super().__init__(agents, **data) - # Planning state tracking - self.active_plan_id = kwargs.get("plan_id", f"plan_{int(time.time())}") - self.current_step_index = None - - # Ensure the planning tool has been initialized properly - if not hasattr(self.planning_tool, "_plans"): - self.planning_tool._plans = {} - - def _initialize_planning_tool( - self, tools: Optional[ToolCollection] - ) -> PlanningTool: - """Initialize planning tool, reusing existing one if available""" - if tools: - for tool in tools.tools: - if isinstance(tool, PlanningTool): - return tool - return PlanningTool() - - @property - def planner(self) -> Optional[BaseAgent]: - """Get the planning agent""" - return ( - self.agents.get(self.planner_key) - if self.planner_key - else self.primary_agent - ) + # Set executor_keys to all agent keys if not specified + if not self.executor_keys: + self.executor_keys = list(self.agents.keys()) def get_executor(self, step_type: Optional[str] = None) -> BaseAgent: """ @@ -91,7 +72,7 @@ class PlanningFlow(BaseFlow): await self._create_initial_plan(input_text) # Verify plan was created successfully - if self.active_plan_id not in self.planning_tool._plans: + if self.active_plan_id not in self.planning_tool.plans: logger.error( f"Plan creation failed. Plan ID {self.active_plan_id} not found in planning tool." ) @@ -123,76 +104,61 @@ class PlanningFlow(BaseFlow): return f"Execution failed: {str(e)}" async def _create_initial_plan(self, request: str) -> None: - """Create an initial plan based on the request using an appropriate agent.""" + """Create an initial plan based on the request using the flow's LLM and PlanningTool.""" logger.info(f"Creating initial plan with ID: {self.active_plan_id}") - agent = self.planner if self.planner else self.primary_agent + # Create a system message for plan creation + system_message = Message.system_message( + "You are a planning assistant. Your task is to create a detailed plan with clear steps." + ) - # First, directly create an empty plan to ensure the plan ID exists - self.planning_tool._plans[self.active_plan_id] = { - "title": f"Plan for: {request[:50]}{'...' if len(request) > 50 else ''}", - "description": f"Auto-generated plan for request: {request}", - "steps": [], - "step_status": {}, - "created_at": time.time(), - "updated_at": time.time(), - } + # Create a user message with the request + user_message = Message.user_message( + f"Create a detailed plan to accomplish this task: {request}" + ) - # Use agent.run to create the plan - plan_prompt = f""" - I need you to create a detailed plan to accomplish this task: + # Call LLM with PlanningTool + response = await self.llm.ask_tool( + messages=[user_message], + system_msgs=[system_message], + tools=[self.planning_tool.to_param()], + tool_choice="required", + ) - {request} + # Process tool calls if present + if response.tool_calls: + for tool_call in response.tool_calls: + if tool_call.function.name == "planning": + # Parse the arguments + args = tool_call.function.arguments + if isinstance(args, str): + try: + args = json.loads(args) + except json.JSONDecodeError: + logger.error(f"Failed to parse tool arguments: {args}") + continue - Please create a plan with ID {self.active_plan_id} using the planning tool. - The plan should include all necessary steps to complete the task. - """ + # Ensure plan_id is set correctly and execute the tool + args["plan_id"] = self.active_plan_id - try: - plan_result = await agent.run(plan_prompt) - logger.info(f"Plan creation result: {plan_result[:200]}...") + # Execute the tool via ToolCollection instead of directly + result = await self.planning_tool.execute(**args) - # Verify the plan was created - if ( - self.active_plan_id not in self.planning_tool._plans - or not self.planning_tool._plans[self.active_plan_id].get("steps") - ): - logger.warning( - "Plan may not have been created properly. Creating default plan." - ) - await self._create_default_plan(request) - except Exception as e: - logger.error(f"Error creating plan: {e}") - await self._create_default_plan(request) + logger.info(f"Plan creation result: {str(result)}") + return - async def _create_default_plan(self, request: str) -> None: - """Create a default plan if the agent fails to create one.""" - try: - # Try using the planning tool directly - await self.planning_tool.execute( - command="create", - plan_id=self.active_plan_id, - title=f"Plan for: {request[:50]}{'...' if len(request) > 50 else ''}", - description=f"Auto-generated plan for request: {request}", - steps=["Analyze request", "Execute task", "Verify results"], - ) - except Exception as e: - logger.error(f"Failed to create default plan with planning tool: {e}") - # Create plan directly in the planning tool's storage - self.planning_tool._plans[self.active_plan_id] = { - "title": f"Emergency Plan for: {request[:50]}{'...' if len(request) > 50 else ''}", - "description": f"Emergency auto-generated plan for request: {request}", + # If execution reached here, create a default plan + logger.warning("Creating default plan") + + # Create default plan using the ToolCollection + await self.planning_tool.execute( + **{ + "command": "create", + "plan_id": self.active_plan_id, + "title": f"Plan for: {request[:50]}{'...' if len(request) > 50 else ''}", "steps": ["Analyze request", "Execute task", "Verify results"], - "step_status": { - "0": "not_started", - "1": "not_started", - "2": "not_started", - }, - "created_at": time.time(), - "updated_at": time.time(), } - - logger.info(f"Created default plan with ID: {self.active_plan_id}") + ) async def _get_current_step_info(self) -> tuple[Optional[int], Optional[dict]]: """ @@ -201,20 +167,24 @@ class PlanningFlow(BaseFlow): """ if ( not self.active_plan_id - or self.active_plan_id not in self.planning_tool._plans + or self.active_plan_id not in self.planning_tool.plans ): logger.error(f"Plan with ID {self.active_plan_id} not found") return None, None try: - # Direct access to step status from planning tool storage - plan_data = self.planning_tool._plans[self.active_plan_id] + # Direct access to plan data from planning tool storage + plan_data = self.planning_tool.plans[self.active_plan_id] steps = plan_data.get("steps", []) - step_status = plan_data.get("step_status", {}) + step_statuses = plan_data.get("step_statuses", []) # Find first non-completed step for i, step in enumerate(steps): - status = step_status.get(str(i), "not_started") + if i >= len(step_statuses): + status = "not_started" + else: + status = step_statuses[i] + if status in ["not_started", "in_progress"]: # Extract step type/category if available step_info = {"text": step} @@ -236,10 +206,15 @@ class PlanningFlow(BaseFlow): ) except Exception as e: logger.warning(f"Error marking step as in_progress: {e}") - # Update step status directly - step_status[str(i)] = "in_progress" - plan_data["step_status"] = step_status - plan_data["updated_at"] = time.time() + # Update step status directly if needed + if i < len(step_statuses): + step_statuses[i] = "in_progress" + else: + while len(step_statuses) < i: + step_statuses.append("not_started") + step_statuses.append("in_progress") + + plan_data["step_statuses"] = step_statuses return i, step_info @@ -297,12 +272,17 @@ class PlanningFlow(BaseFlow): except Exception as e: logger.warning(f"Failed to update plan status: {e}") # Update step status directly in planning tool storage - if self.active_plan_id in self.planning_tool._plans: - plan_data = self.planning_tool._plans[self.active_plan_id] - step_status = plan_data.get("step_status", {}) - step_status[str(self.current_step_index)] = "completed" - plan_data["step_status"] = step_status - plan_data["updated_at"] = time.time() + if self.active_plan_id in self.planning_tool.plans: + plan_data = self.planning_tool.plans[self.active_plan_id] + step_statuses = plan_data.get("step_statuses", []) + + # Ensure the step_statuses list is long enough + while len(step_statuses) <= self.current_step_index: + step_statuses.append("not_started") + + # Update the status + step_statuses[self.current_step_index] = "completed" + plan_data["step_statuses"] = step_statuses async def _get_plan_text(self) -> str: """Get the current plan as formatted text.""" @@ -318,14 +298,20 @@ class PlanningFlow(BaseFlow): def _generate_plan_text_from_storage(self) -> str: """Generate plan text directly from storage if the planning tool fails.""" try: - if self.active_plan_id not in self.planning_tool._plans: + if self.active_plan_id not in self.planning_tool.plans: return f"Error: Plan with ID {self.active_plan_id} not found" - plan_data = self.planning_tool._plans[self.active_plan_id] + plan_data = self.planning_tool.plans[self.active_plan_id] title = plan_data.get("title", "Untitled Plan") - description = plan_data.get("description", "") steps = plan_data.get("steps", []) - step_status = plan_data.get("step_status", {}) + step_statuses = plan_data.get("step_statuses", []) + step_notes = plan_data.get("step_notes", []) + + # Ensure step_statuses and step_notes match the number of steps + while len(step_statuses) < len(steps): + step_statuses.append("not_started") + while len(step_notes) < len(steps): + step_notes.append("") # Count steps by status status_counts = { @@ -334,7 +320,8 @@ class PlanningFlow(BaseFlow): "blocked": 0, "not_started": 0, } - for status in step_status.values(): + + for status in step_statuses: if status in status_counts: status_counts[status] += 1 @@ -344,7 +331,7 @@ class PlanningFlow(BaseFlow): plan_text = f"Plan: {title} (ID: {self.active_plan_id})\n" plan_text += "=" * len(plan_text) + "\n\n" - plan_text += f"{description}\n\n" if description else "" + plan_text += ( f"Progress: {completed}/{total} steps completed ({progress:.1f}%)\n" ) @@ -352,8 +339,9 @@ class PlanningFlow(BaseFlow): plan_text += f"{status_counts['blocked']} blocked, {status_counts['not_started']} not started\n\n" plan_text += "Steps:\n" - for i, step in enumerate(steps): - status = step_status.get(str(i), "not_started") + for i, (step, status, notes) in enumerate( + zip(steps, step_statuses, step_notes) + ): if status == "completed": status_mark = "[✓]" elif status == "in_progress": @@ -364,39 +352,48 @@ class PlanningFlow(BaseFlow): status_mark = "[ ]" plan_text += f"{i}. {status_mark} {step}\n" + if notes: + plan_text += f" Notes: {notes}\n" return plan_text except Exception as e: logger.error(f"Error generating plan text from storage: {e}") return f"Error: Unable to retrieve plan with ID {self.active_plan_id}" - async def _get_plan(self) -> dict: - """Get the current plan as a dictionary.""" - if ( - not self.active_plan_id - or self.active_plan_id not in self.planning_tool._plans - ): - return {} - return self.planning_tool._plans[self.active_plan_id] - async def _finalize_plan(self) -> str: - """Finalize the plan and provide a summary using an appropriate agent.""" - agent = self.planner if self.planner else self.primary_agent + """Finalize the plan and provide a summary using the flow's LLM directly.""" plan_text = await self._get_plan_text() - # Create a summary prompt - summary_prompt = f""" - The plan has been completed. Here is the final plan status: - - {plan_text} - - Please provide a summary of what was accomplished and any final thoughts. - """ - - # Use agent.run() to generate the summary + # Create a summary using the flow's LLM directly try: - summary = await agent.run(summary_prompt) - return f"Plan completed:\n\n{summary}" + system_message = Message.system_message( + "You are a planning assistant. Your task is to summarize the completed plan." + ) + + user_message = Message.user_message( + f"The plan has been completed. Here is the final plan status:\n\n{plan_text}\n\nPlease provide a summary of what was accomplished and any final thoughts." + ) + + response = await self.llm.ask( + messages=[user_message], system_msgs=[system_message] + ) + + return f"Plan completed:\n\n{response}" except Exception as e: - logger.error(f"Error finalizing plan: {e}") - return "Plan completed. Error generating summary." + logger.error(f"Error finalizing plan with LLM: {e}") + + # Fallback to using an agent for the summary + try: + agent = self.primary_agent + summary_prompt = f""" + The plan has been completed. Here is the final plan status: + + {plan_text} + + Please provide a summary of what was accomplished and any final thoughts. + """ + summary = await agent.run(summary_prompt) + return f"Plan completed:\n\n{summary}" + except Exception as e2: + logger.error(f"Error finalizing plan with agent: {e2}") + return "Plan completed. Error generating summary." diff --git a/app/tool/planning.py b/app/tool/planning.py index 48d51b9..47e334d 100644 --- a/app/tool/planning.py +++ b/app/tool/planning.py @@ -66,10 +66,9 @@ class PlanningTool(BaseTool): "additionalProperties": False, } - _plans = {} # Dictionary to store plans by plan_id - _current_plan_id = None # Track the current active plan + plans: dict = {} # Dictionary to store plans by plan_id + _current_plan_id: Optional[str] = None # Track the current active plan - # async def __call__( async def execute( self, *, @@ -125,7 +124,7 @@ class PlanningTool(BaseTool): if not plan_id: raise ToolError("Parameter `plan_id` is required for command: create") - if plan_id in self._plans: + if plan_id in self.plans: raise ToolError( f"A plan with ID '{plan_id}' already exists. Use 'update' to modify existing plans." ) @@ -151,7 +150,7 @@ class PlanningTool(BaseTool): "step_notes": [""] * len(steps), } - self._plans[plan_id] = plan + self.plans[plan_id] = plan self._current_plan_id = plan_id # Set as active plan return ToolResult( @@ -165,10 +164,10 @@ class PlanningTool(BaseTool): if not plan_id: raise ToolError("Parameter `plan_id` is required for command: update") - if plan_id not in self._plans: + if plan_id not in self.plans: raise ToolError(f"No plan found with ID: {plan_id}") - plan = self._plans[plan_id] + plan = self.plans[plan_id] if title: plan["title"] = title @@ -209,13 +208,13 @@ class PlanningTool(BaseTool): def _list_plans(self) -> ToolResult: """List all available plans.""" - if not self._plans: + if not self.plans: return ToolResult( output="No plans available. Create a plan with the 'create' command." ) output = "Available plans:\n" - for plan_id, plan in self._plans.items(): + for plan_id, plan in self.plans.items(): current_marker = " (active)" if plan_id == self._current_plan_id else "" completed = sum( 1 for status in plan["step_statuses"] if status == "completed" @@ -236,10 +235,10 @@ class PlanningTool(BaseTool): ) plan_id = self._current_plan_id - if plan_id not in self._plans: + if plan_id not in self.plans: raise ToolError(f"No plan found with ID: {plan_id}") - plan = self._plans[plan_id] + plan = self.plans[plan_id] return ToolResult(output=self._format_plan(plan)) def _set_active_plan(self, plan_id: Optional[str]) -> ToolResult: @@ -247,12 +246,12 @@ class PlanningTool(BaseTool): if not plan_id: raise ToolError("Parameter `plan_id` is required for command: set_active") - if plan_id not in self._plans: + if plan_id not in self.plans: raise ToolError(f"No plan found with ID: {plan_id}") self._current_plan_id = plan_id return ToolResult( - output=f"Plan '{plan_id}' is now the active plan.\n\n{self._format_plan(self._plans[plan_id])}" + output=f"Plan '{plan_id}' is now the active plan.\n\n{self._format_plan(self.plans[plan_id])}" ) def _mark_step( @@ -271,13 +270,13 @@ class PlanningTool(BaseTool): ) plan_id = self._current_plan_id - if plan_id not in self._plans: + if plan_id not in self.plans: raise ToolError(f"No plan found with ID: {plan_id}") if step_index is None: raise ToolError("Parameter `step_index` is required for command: mark_step") - plan = self._plans[plan_id] + plan = self.plans[plan_id] if step_index < 0 or step_index >= len(plan["steps"]): raise ToolError( @@ -309,10 +308,10 @@ class PlanningTool(BaseTool): if not plan_id: raise ToolError("Parameter `plan_id` is required for command: delete") - if plan_id not in self._plans: + if plan_id not in self.plans: raise ToolError(f"No plan found with ID: {plan_id}") - del self._plans[plan_id] + del self.plans[plan_id] # If the deleted plan was the active plan, clear the active plan if self._current_plan_id == plan_id: