update toolcall, planning use and README.md

This commit is contained in:
liangxinbing 2025-03-07 03:21:10 +08:00
parent caae67da97
commit 64786fd3b5
8 changed files with 216 additions and 561 deletions

View File

@ -70,6 +70,12 @@ python main.py
Then input your idea via terminal!
For unstable version, you also can run:
```bash
python run_flow.py
```
## How to contribute
We welcome any friendly suggestions and helpful contributions! Just create issues or submit pull requests.

View File

@ -1,6 +1,6 @@
from pydantic import Field
from app.agent.toolcall_en import ToolCallAgent
from app.agent.toolcall import ToolCallAgent
from app.prompt.manus import NEXT_STEP_PROMPT, SYSTEM_PROMPT
from app.tool import Terminate, ToolCollection
from app.tool.browser_use_tool import BrowserUseTool

View File

@ -50,17 +50,22 @@ class ToolCallAgent(ReActAgent):
self.tool_calls = response.tool_calls
# Log response info
logger.info(f"Tool content: {response.content}")
logger.info(f"{self.name}'s thoughts: {response.content}")
logger.info(
f"Tool calls count: {len(response.tool_calls) if response.tool_calls else 0}"
f"🛠️ {self.name} selected {len(response.tool_calls) if response.tool_calls else 0} tools to use"
)
logger.info(f"Tool calls: {response.tool_calls}")
if response.tool_calls:
logger.info(
f"🧰 Tools being prepared: {[call.function.name for call in response.tool_calls]}"
)
try:
# Handle different tool_choices modes
if self.tool_choices == "none":
if response.tool_calls:
logger.warning("Tool calls provided when tool_choice is 'none'")
logger.warning(
f"🤔 Hmm, {self.name} tried to use tools when they weren't available!"
)
if response.content:
self.memory.add_message(Message.assistant_message(response.content))
return True
@ -85,7 +90,7 @@ class ToolCallAgent(ReActAgent):
return bool(self.tool_calls)
except Exception as e:
logger.error(f"Error in thinking phase: {e}")
logger.error(f"🚨 Oops! The {self.name}'s thinking process hit a snag: {e}")
self.memory.add_message(
Message.assistant_message(
f"Error encountered while processing: {str(e)}"
@ -105,7 +110,9 @@ class ToolCallAgent(ReActAgent):
results = []
for command in self.tool_calls:
result = await self.execute_tool(command)
logger.info(f"Executed tool {command.function.name} with result: {result}")
logger.info(
f"🎯 Tool '{command.function.name}' completed its mission! Result: {result}"
)
# Add tool response to memory
tool_msg = Message.tool_message(
@ -130,6 +137,7 @@ class ToolCallAgent(ReActAgent):
args = json.loads(command.function.arguments or "{}")
# Execute the tool
logger.info(f"🔧 Activating tool: '{name}'...")
result = await self.available_tools.execute(name=name, tool_input=args)
# Format result for display
@ -145,10 +153,12 @@ class ToolCallAgent(ReActAgent):
return observation
except json.JSONDecodeError:
error_msg = f"Error parsing arguments for {name}: Invalid JSON format"
logger.error(error_msg)
logger.error(
f"📝 Oops! The arguments for '{name}' don't make sense - invalid JSON"
)
return f"Error: {error_msg}"
except Exception as e:
error_msg = f"Error executing tool {name}: {str(e)}"
error_msg = f"⚠️ Tool '{name}' encountered a problem: {str(e)}"
logger.error(error_msg)
return f"Error: {error_msg}"
@ -159,6 +169,7 @@ class ToolCallAgent(ReActAgent):
if self._should_finish_execution(name=name, result=result, **kwargs):
# Set agent state to finished
logger.info(f"🏁 Special tool '{name}' has completed the task!")
self.state = AgentState.FINISHED
@staticmethod

View File

@ -1,182 +0,0 @@
import json
from typing import Any, List, Literal
from pydantic import Field
from app.agent.react import ReActAgent
from app.logger import logger
from app.prompt.toolcall import NEXT_STEP_PROMPT, SYSTEM_PROMPT
from app.schema import AgentState, Message, ToolCall
from app.tool import CreateChatCompletion, Terminate, ToolCollection
TOOL_CALL_REQUIRED = "Tool calls required but none provided"
class ToolCallAgent(ReActAgent):
"""Base agent class for handling tool/function calls with enhanced abstraction"""
name: str = "toolcall"
description: str = "an agent that can execute tool calls."
system_prompt: str = SYSTEM_PROMPT
next_step_prompt: str = NEXT_STEP_PROMPT
available_tools: ToolCollection = ToolCollection(
CreateChatCompletion(), Terminate()
)
tool_choices: Literal["none", "auto", "required"] = "auto"
special_tool_names: List[str] = Field(default_factory=lambda: [Terminate().name])
tool_calls: List[ToolCall] = Field(default_factory=list)
max_steps: int = 30
async def think(self) -> bool:
"""Process current state and decide next actions using tools"""
if self.next_step_prompt:
user_msg = Message.user_message(self.next_step_prompt)
self.messages += [user_msg]
# Get response with tool options
response = await self.llm.ask_tool(
messages=self.messages,
system_msgs=[Message.system_message(self.system_prompt)]
if self.system_prompt
else None,
tools=self.available_tools.to_params(),
tool_choice=self.tool_choices,
)
self.tool_calls = response.tool_calls
# Log response info in a more engaging way
logger.info(f"✨ AI's thoughts: {response.content}")
logger.info(
f"🛠️ AI selected {len(response.tool_calls) if response.tool_calls else 0} tools to use"
)
if response.tool_calls:
logger.info(
f"🧰 Tools being prepared: {[call.function.name for call in response.tool_calls]}"
)
try:
# Handle different tool_choices modes
if self.tool_choices == "none":
if response.tool_calls:
logger.warning(
"🤔 Hmm, AI tried to use tools when they weren't available!"
)
if response.content:
self.memory.add_message(Message.assistant_message(response.content))
return True
return False
# Create and add assistant message
assistant_msg = (
Message.from_tool_calls(
content=response.content, tool_calls=self.tool_calls
)
if self.tool_calls
else Message.assistant_message(response.content)
)
self.memory.add_message(assistant_msg)
if self.tool_choices == "required" and not self.tool_calls:
return True # Will be handled in act()
# For 'auto' mode, continue with content if no commands but content exists
if self.tool_choices == "auto" and not self.tool_calls:
return bool(response.content)
return bool(self.tool_calls)
except Exception as e:
logger.error(f"🚨 Oops! The AI's thinking process hit a snag: {e}")
self.memory.add_message(
Message.assistant_message(
f"Error encountered while processing: {str(e)}"
)
)
return False
async def act(self) -> str:
"""Execute tool calls and handle their results"""
if not self.tool_calls:
if self.tool_choices == "required":
raise ValueError(TOOL_CALL_REQUIRED)
# Return last message content if no tool calls
return self.messages[-1].content or "No content or commands to execute"
results = []
for command in self.tool_calls:
result = await self.execute_tool(command)
logger.info(
f"🎯 Tool '{command.function.name}' completed its mission! Result: {result[:100]}{'...' if len(result) > 100 else ''}"
)
# Add tool response to memory
tool_msg = Message.tool_message(
content=result, tool_call_id=command.id, name=command.function.name
)
self.memory.add_message(tool_msg)
results.append(result)
return "\n\n".join(results)
async def execute_tool(self, command: ToolCall) -> str:
"""Execute a single tool call with robust error handling"""
if not command or not command.function or not command.function.name:
return "Error: Invalid command format"
name = command.function.name
if name not in self.available_tools.tool_map:
return f"Error: Unknown tool '{name}'"
try:
# Parse arguments
args = json.loads(command.function.arguments or "{}")
# Execute the tool
logger.info(f"🔧 Activating tool: '{name}'...")
result = await self.available_tools.execute(name=name, tool_input=args)
# Format result for display
observation = (
f"Observed output of cmd `{name}` executed:\n{str(result)}"
if result
else f"Cmd `{name}` completed with no output"
)
# Handle special tools like `finish`
await self._handle_special_tool(name=name, result=result)
return observation
except json.JSONDecodeError:
error_msg = f"Error parsing arguments for {name}: Invalid JSON format"
logger.error(
f"📝 Oops! The arguments for '{name}' don't make sense - invalid JSON"
)
return f"Error: {error_msg}"
except Exception as e:
error_msg = f"Error executing tool {name}: {str(e)}"
logger.error(f"⚠️ Tool '{name}' encountered a problem: {str(e)}")
return f"Error: {error_msg}"
async def _handle_special_tool(self, name: str, result: Any, **kwargs):
"""Handle special tool execution and state changes"""
if not self._is_special_tool(name):
return
if self._should_finish_execution(name=name, result=result, **kwargs):
# Set agent state to finished
logger.info(f"🏁 Special tool '{name}' has completed the task!")
self.state = AgentState.FINISHED
@staticmethod
def _should_finish_execution(**kwargs) -> bool:
"""Determine if tool execution should finish the agent"""
return True
def _is_special_tool(self, name: str) -> bool:
"""Check if tool name is in special tools list"""
return name.lower() in [n.lower() for n in self.special_tool_names]

View File

@ -1,178 +0,0 @@
import json
from typing import Any, List, Literal
from pydantic import Field
from app.agent.react import ReActAgent
from app.logger import logger
from app.prompt.toolcall import NEXT_STEP_PROMPT, SYSTEM_PROMPT
from app.schema import AgentState, Message, ToolCall
from app.tool import CreateChatCompletion, Terminate, ToolCollection
TOOL_CALL_REQUIRED = "Tool calls required but none provided"
class ToolCallAgent(ReActAgent):
"""Base agent class for handling tool/function calls with enhanced abstraction"""
name: str = "toolcall"
description: str = "an agent that can execute tool calls."
system_prompt: str = SYSTEM_PROMPT
next_step_prompt: str = NEXT_STEP_PROMPT
available_tools: ToolCollection = ToolCollection(
CreateChatCompletion(), Terminate()
)
tool_choices: Literal["none", "auto", "required"] = "auto"
special_tool_names: List[str] = Field(default_factory=lambda: [Terminate().name])
tool_calls: List[ToolCall] = Field(default_factory=list)
max_steps: int = 30
async def think(self) -> bool:
"""Process current state and decide next actions using tools"""
if self.next_step_prompt:
user_msg = Message.user_message(self.next_step_prompt)
self.messages += [user_msg]
# Get response with tool options
response = await self.llm.ask_tool(
messages=self.messages,
system_msgs=[Message.system_message(self.system_prompt)]
if self.system_prompt
else None,
tools=self.available_tools.to_params(),
tool_choice=self.tool_choices,
)
self.tool_calls = response.tool_calls
# Log response info in a more engaging way
logger.info(f"✨ AI的思考过程{response.content}")
logger.info(
f"🛠️ AI选择了 {len(response.tool_calls) if response.tool_calls else 0} 个工具来解决问题"
)
if response.tool_calls:
logger.info(
f"🧰 准备使用的工具箱:{[call.function.name for call in response.tool_calls]}"
)
try:
# Handle different tool_choices modes
if self.tool_choices == "none":
if response.tool_calls:
logger.warning("🤔 咦AI想用工具但现在不允许使用任何工具呢")
if response.content:
self.memory.add_message(Message.assistant_message(response.content))
return True
return False
# Create and add assistant message
assistant_msg = (
Message.from_tool_calls(
content=response.content, tool_calls=self.tool_calls
)
if self.tool_calls
else Message.assistant_message(response.content)
)
self.memory.add_message(assistant_msg)
if self.tool_choices == "required" and not self.tool_calls:
return True # Will be handled in act()
# For 'auto' mode, continue with content if no commands but content exists
if self.tool_choices == "auto" and not self.tool_calls:
return bool(response.content)
return bool(self.tool_calls)
except Exception as e:
logger.error(f"🚨 糟糕AI思考时遇到了一点小问题{e}")
self.memory.add_message(
Message.assistant_message(
f"Error encountered while processing: {str(e)}"
)
)
return False
async def act(self) -> str:
"""Execute tool calls and handle their results"""
if not self.tool_calls:
if self.tool_choices == "required":
raise ValueError(TOOL_CALL_REQUIRED)
# Return last message content if no tool calls
return self.messages[-1].content or "No content or commands to execute"
results = []
for command in self.tool_calls:
result = await self.execute_tool(command)
logger.info(
f"🎯 工具「{command.function.name}」完成任务啦!结果:{result[:100]}{'...' if len(result) > 100 else ''}"
)
# Add tool response to memory
tool_msg = Message.tool_message(
content=result, tool_call_id=command.id, name=command.function.name
)
self.memory.add_message(tool_msg)
results.append(result)
return "\n\n".join(results)
async def execute_tool(self, command: ToolCall) -> str:
"""Execute a single tool call with robust error handling"""
if not command or not command.function or not command.function.name:
return "Error: Invalid command format"
name = command.function.name
if name not in self.available_tools.tool_map:
return f"Error: Unknown tool '{name}'"
try:
# Parse arguments
args = json.loads(command.function.arguments or "{}")
# Execute the tool
logger.info(f"🔧 正在启动工具:「{name}」...")
result = await self.available_tools.execute(name=name, tool_input=args)
# Format result for display
observation = (
f"Observed output of cmd `{name}` executed:\n{str(result)}"
if result
else f"Cmd `{name}` completed with no output"
)
# Handle special tools like `finish`
await self._handle_special_tool(name=name, result=result)
return observation
except json.JSONDecodeError:
error_msg = f"Error parsing arguments for {name}: Invalid JSON format"
logger.error(f"📝 哎呀!「{name}」工具的参数格式不对 - JSON解析失败")
return f"Error: {error_msg}"
except Exception as e:
error_msg = f"Error executing tool {name}: {str(e)}"
logger.error(f"⚠️ 工具「{name}」执行时遇到了一点小麻烦:{str(e)}")
return f"Error: {error_msg}"
async def _handle_special_tool(self, name: str, result: Any, **kwargs):
"""Handle special tool execution and state changes"""
if not self._is_special_tool(name):
return
if self._should_finish_execution(name=name, result=result, **kwargs):
# Set agent state to finished
logger.info(f"🏁 特殊工具「{name}」已完成任务AI助手的工作告一段落")
self.state = AgentState.FINISHED
@staticmethod
def _should_finish_execution(**kwargs) -> bool:
"""Determine if tool execution should finish the agent"""
return True
def _is_special_tool(self, name: str) -> bool:
"""Check if tool name is in special tools list"""
return name.lower() in [n.lower() for n in self.special_tool_names]

View File

@ -2,43 +2,47 @@ from abc import ABC, abstractmethod
from enum import Enum
from typing import Dict, List, Optional, Union
from pydantic import BaseModel
from app.agent.base import BaseAgent
from app.agent.toolcall import ToolCallAgent
class FlowType(str, Enum):
PLANNING = "planning"
class BaseFlow(ABC):
class BaseFlow(BaseModel, ABC):
"""Base class for execution flows supporting multiple agents"""
agents: Dict[str, BaseAgent]
tools: Optional[List] = None
primary_agent_key: Optional[str] = None
class Config:
arbitrary_types_allowed = True
def __init__(
self, agents: Union[BaseAgent, List[BaseAgent], Dict[str, BaseAgent]], **kwargs
self, agents: Union[BaseAgent, List[BaseAgent], Dict[str, BaseAgent]], **data
):
# Handle different ways of providing agents
if isinstance(agents, BaseAgent):
self.agents = {"default": agents}
agents_dict = {"default": agents}
elif isinstance(agents, list):
self.agents = {f"agent_{i}": agent for i, agent in enumerate(agents)}
agents_dict = {f"agent_{i}": agent for i, agent in enumerate(agents)}
else:
self.agents = agents
self.tools = kwargs.get("tools")
self.primary_agent_key = kwargs.get("primary_agent", None)
agents_dict = agents
# If primary agent not specified, use first agent
if not self.primary_agent_key and self.agents:
self.primary_agent_key = next(iter(self.agents))
primary_key = data.get("primary_agent_key")
if not primary_key and agents_dict:
primary_key = next(iter(agents_dict))
data["primary_agent_key"] = primary_key
self._setup_agents()
# Set the agents dictionary
data["agents"] = agents_dict
def _setup_agents(self):
"""Configure all agents with tools and initial setup"""
if self.tools:
for agent_key, agent in self.agents.items():
if isinstance(agent, ToolCallAgent):
agent.available_tools = self.tools
# Initialize using BaseModel's init
super().__init__(**data)
@property
def primary_agent(self) -> Optional[BaseAgent]:
@ -52,8 +56,6 @@ class BaseFlow(ABC):
def add_agent(self, key: str, agent: BaseAgent) -> None:
"""Add a new agent to the flow"""
self.agents[key] = agent
if isinstance(agent, ToolCallAgent) and self.tools:
agent.available_tools = self.tools
@abstractmethod
async def execute(self, input_text: str) -> str:

View File

@ -1,67 +1,48 @@
import json
import time
from typing import Dict, List, Optional, Union
from pydantic import Field
from app.agent.base import BaseAgent
from app.flow.base import BaseFlow
from app.llm import LLM
from app.logger import logger
from app.schema import AgentState
from app.tool import PlanningTool, ToolCollection
from app.schema import AgentState, Message
from app.tool import PlanningTool
class PlanningFlow(BaseFlow):
"""A flow that manages planning and execution of tasks using agents."""
llm: LLM = Field(default_factory=lambda: LLM())
planning_tool: PlanningTool = Field(default_factory=PlanningTool)
executor_keys: List[str] = Field(default_factory=list)
active_plan_id: str = Field(default_factory=lambda: f"plan_{int(time.time())}")
current_step_index: Optional[int] = None
def __init__(
self, agents: Union[BaseAgent, List[BaseAgent], Dict[str, BaseAgent]], **kwargs
self, agents: Union[BaseAgent, List[BaseAgent], Dict[str, BaseAgent]], **data
):
# Initialize planning tool first
self.planning_tool = self._initialize_planning_tool(kwargs.get("tools"))
# Set executor keys before super().__init__
if "executors" in data:
data["executor_keys"] = data.pop("executors")
# If tools were provided, ensure planning tool is included
tools = kwargs.get("tools")
if tools:
planning_tool_exists = any(
isinstance(tool, PlanningTool) for tool in tools.tools
)
if not planning_tool_exists:
tools.add_tool(self.planning_tool)
else:
# Create a new tool collection with at least the planning tool
tools = ToolCollection(self.planning_tool)
kwargs["tools"] = tools
# Set plan ID if provided
if "plan_id" in data:
data["active_plan_id"] = data.pop("plan_id")
super().__init__(agents, **kwargs)
# Initialize the planning tool if not provided
if "planning_tool" not in data:
planning_tool = PlanningTool()
data["planning_tool"] = planning_tool
# Define agent roles
self.planner_key = kwargs.get("planner", self.primary_agent_key)
self.executor_keys = kwargs.get("executors", list(self.agents.keys()))
# Call parent's init with the processed data
super().__init__(agents, **data)
# Planning state tracking
self.active_plan_id = kwargs.get("plan_id", f"plan_{int(time.time())}")
self.current_step_index = None
# Ensure the planning tool has been initialized properly
if not hasattr(self.planning_tool, "_plans"):
self.planning_tool._plans = {}
def _initialize_planning_tool(
self, tools: Optional[ToolCollection]
) -> PlanningTool:
"""Initialize planning tool, reusing existing one if available"""
if tools:
for tool in tools.tools:
if isinstance(tool, PlanningTool):
return tool
return PlanningTool()
@property
def planner(self) -> Optional[BaseAgent]:
"""Get the planning agent"""
return (
self.agents.get(self.planner_key)
if self.planner_key
else self.primary_agent
)
# Set executor_keys to all agent keys if not specified
if not self.executor_keys:
self.executor_keys = list(self.agents.keys())
def get_executor(self, step_type: Optional[str] = None) -> BaseAgent:
"""
@ -91,7 +72,7 @@ class PlanningFlow(BaseFlow):
await self._create_initial_plan(input_text)
# Verify plan was created successfully
if self.active_plan_id not in self.planning_tool._plans:
if self.active_plan_id not in self.planning_tool.plans:
logger.error(
f"Plan creation failed. Plan ID {self.active_plan_id} not found in planning tool."
)
@ -123,76 +104,61 @@ class PlanningFlow(BaseFlow):
return f"Execution failed: {str(e)}"
async def _create_initial_plan(self, request: str) -> None:
"""Create an initial plan based on the request using an appropriate agent."""
"""Create an initial plan based on the request using the flow's LLM and PlanningTool."""
logger.info(f"Creating initial plan with ID: {self.active_plan_id}")
agent = self.planner if self.planner else self.primary_agent
# Create a system message for plan creation
system_message = Message.system_message(
"You are a planning assistant. Your task is to create a detailed plan with clear steps."
)
# First, directly create an empty plan to ensure the plan ID exists
self.planning_tool._plans[self.active_plan_id] = {
"title": f"Plan for: {request[:50]}{'...' if len(request) > 50 else ''}",
"description": f"Auto-generated plan for request: {request}",
"steps": [],
"step_status": {},
"created_at": time.time(),
"updated_at": time.time(),
}
# Create a user message with the request
user_message = Message.user_message(
f"Create a detailed plan to accomplish this task: {request}"
)
# Use agent.run to create the plan
plan_prompt = f"""
I need you to create a detailed plan to accomplish this task:
# Call LLM with PlanningTool
response = await self.llm.ask_tool(
messages=[user_message],
system_msgs=[system_message],
tools=[self.planning_tool.to_param()],
tool_choice="required",
)
{request}
# Process tool calls if present
if response.tool_calls:
for tool_call in response.tool_calls:
if tool_call.function.name == "planning":
# Parse the arguments
args = tool_call.function.arguments
if isinstance(args, str):
try:
args = json.loads(args)
except json.JSONDecodeError:
logger.error(f"Failed to parse tool arguments: {args}")
continue
Please create a plan with ID {self.active_plan_id} using the planning tool.
The plan should include all necessary steps to complete the task.
"""
# Ensure plan_id is set correctly and execute the tool
args["plan_id"] = self.active_plan_id
try:
plan_result = await agent.run(plan_prompt)
logger.info(f"Plan creation result: {plan_result[:200]}...")
# Execute the tool via ToolCollection instead of directly
result = await self.planning_tool.execute(**args)
# Verify the plan was created
if (
self.active_plan_id not in self.planning_tool._plans
or not self.planning_tool._plans[self.active_plan_id].get("steps")
):
logger.warning(
"Plan may not have been created properly. Creating default plan."
)
await self._create_default_plan(request)
except Exception as e:
logger.error(f"Error creating plan: {e}")
await self._create_default_plan(request)
logger.info(f"Plan creation result: {str(result)}")
return
async def _create_default_plan(self, request: str) -> None:
"""Create a default plan if the agent fails to create one."""
try:
# Try using the planning tool directly
await self.planning_tool.execute(
command="create",
plan_id=self.active_plan_id,
title=f"Plan for: {request[:50]}{'...' if len(request) > 50 else ''}",
description=f"Auto-generated plan for request: {request}",
steps=["Analyze request", "Execute task", "Verify results"],
)
except Exception as e:
logger.error(f"Failed to create default plan with planning tool: {e}")
# Create plan directly in the planning tool's storage
self.planning_tool._plans[self.active_plan_id] = {
"title": f"Emergency Plan for: {request[:50]}{'...' if len(request) > 50 else ''}",
"description": f"Emergency auto-generated plan for request: {request}",
# If execution reached here, create a default plan
logger.warning("Creating default plan")
# Create default plan using the ToolCollection
await self.planning_tool.execute(
**{
"command": "create",
"plan_id": self.active_plan_id,
"title": f"Plan for: {request[:50]}{'...' if len(request) > 50 else ''}",
"steps": ["Analyze request", "Execute task", "Verify results"],
"step_status": {
"0": "not_started",
"1": "not_started",
"2": "not_started",
},
"created_at": time.time(),
"updated_at": time.time(),
}
logger.info(f"Created default plan with ID: {self.active_plan_id}")
)
async def _get_current_step_info(self) -> tuple[Optional[int], Optional[dict]]:
"""
@ -201,20 +167,24 @@ class PlanningFlow(BaseFlow):
"""
if (
not self.active_plan_id
or self.active_plan_id not in self.planning_tool._plans
or self.active_plan_id not in self.planning_tool.plans
):
logger.error(f"Plan with ID {self.active_plan_id} not found")
return None, None
try:
# Direct access to step status from planning tool storage
plan_data = self.planning_tool._plans[self.active_plan_id]
# Direct access to plan data from planning tool storage
plan_data = self.planning_tool.plans[self.active_plan_id]
steps = plan_data.get("steps", [])
step_status = plan_data.get("step_status", {})
step_statuses = plan_data.get("step_statuses", [])
# Find first non-completed step
for i, step in enumerate(steps):
status = step_status.get(str(i), "not_started")
if i >= len(step_statuses):
status = "not_started"
else:
status = step_statuses[i]
if status in ["not_started", "in_progress"]:
# Extract step type/category if available
step_info = {"text": step}
@ -236,10 +206,15 @@ class PlanningFlow(BaseFlow):
)
except Exception as e:
logger.warning(f"Error marking step as in_progress: {e}")
# Update step status directly
step_status[str(i)] = "in_progress"
plan_data["step_status"] = step_status
plan_data["updated_at"] = time.time()
# Update step status directly if needed
if i < len(step_statuses):
step_statuses[i] = "in_progress"
else:
while len(step_statuses) < i:
step_statuses.append("not_started")
step_statuses.append("in_progress")
plan_data["step_statuses"] = step_statuses
return i, step_info
@ -297,12 +272,17 @@ class PlanningFlow(BaseFlow):
except Exception as e:
logger.warning(f"Failed to update plan status: {e}")
# Update step status directly in planning tool storage
if self.active_plan_id in self.planning_tool._plans:
plan_data = self.planning_tool._plans[self.active_plan_id]
step_status = plan_data.get("step_status", {})
step_status[str(self.current_step_index)] = "completed"
plan_data["step_status"] = step_status
plan_data["updated_at"] = time.time()
if self.active_plan_id in self.planning_tool.plans:
plan_data = self.planning_tool.plans[self.active_plan_id]
step_statuses = plan_data.get("step_statuses", [])
# Ensure the step_statuses list is long enough
while len(step_statuses) <= self.current_step_index:
step_statuses.append("not_started")
# Update the status
step_statuses[self.current_step_index] = "completed"
plan_data["step_statuses"] = step_statuses
async def _get_plan_text(self) -> str:
"""Get the current plan as formatted text."""
@ -318,14 +298,20 @@ class PlanningFlow(BaseFlow):
def _generate_plan_text_from_storage(self) -> str:
"""Generate plan text directly from storage if the planning tool fails."""
try:
if self.active_plan_id not in self.planning_tool._plans:
if self.active_plan_id not in self.planning_tool.plans:
return f"Error: Plan with ID {self.active_plan_id} not found"
plan_data = self.planning_tool._plans[self.active_plan_id]
plan_data = self.planning_tool.plans[self.active_plan_id]
title = plan_data.get("title", "Untitled Plan")
description = plan_data.get("description", "")
steps = plan_data.get("steps", [])
step_status = plan_data.get("step_status", {})
step_statuses = plan_data.get("step_statuses", [])
step_notes = plan_data.get("step_notes", [])
# Ensure step_statuses and step_notes match the number of steps
while len(step_statuses) < len(steps):
step_statuses.append("not_started")
while len(step_notes) < len(steps):
step_notes.append("")
# Count steps by status
status_counts = {
@ -334,7 +320,8 @@ class PlanningFlow(BaseFlow):
"blocked": 0,
"not_started": 0,
}
for status in step_status.values():
for status in step_statuses:
if status in status_counts:
status_counts[status] += 1
@ -344,7 +331,7 @@ class PlanningFlow(BaseFlow):
plan_text = f"Plan: {title} (ID: {self.active_plan_id})\n"
plan_text += "=" * len(plan_text) + "\n\n"
plan_text += f"{description}\n\n" if description else ""
plan_text += (
f"Progress: {completed}/{total} steps completed ({progress:.1f}%)\n"
)
@ -352,8 +339,9 @@ class PlanningFlow(BaseFlow):
plan_text += f"{status_counts['blocked']} blocked, {status_counts['not_started']} not started\n\n"
plan_text += "Steps:\n"
for i, step in enumerate(steps):
status = step_status.get(str(i), "not_started")
for i, (step, status, notes) in enumerate(
zip(steps, step_statuses, step_notes)
):
if status == "completed":
status_mark = "[✓]"
elif status == "in_progress":
@ -364,39 +352,48 @@ class PlanningFlow(BaseFlow):
status_mark = "[ ]"
plan_text += f"{i}. {status_mark} {step}\n"
if notes:
plan_text += f" Notes: {notes}\n"
return plan_text
except Exception as e:
logger.error(f"Error generating plan text from storage: {e}")
return f"Error: Unable to retrieve plan with ID {self.active_plan_id}"
async def _get_plan(self) -> dict:
"""Get the current plan as a dictionary."""
if (
not self.active_plan_id
or self.active_plan_id not in self.planning_tool._plans
):
return {}
return self.planning_tool._plans[self.active_plan_id]
async def _finalize_plan(self) -> str:
"""Finalize the plan and provide a summary using an appropriate agent."""
agent = self.planner if self.planner else self.primary_agent
"""Finalize the plan and provide a summary using the flow's LLM directly."""
plan_text = await self._get_plan_text()
# Create a summary prompt
summary_prompt = f"""
The plan has been completed. Here is the final plan status:
{plan_text}
Please provide a summary of what was accomplished and any final thoughts.
"""
# Use agent.run() to generate the summary
# Create a summary using the flow's LLM directly
try:
summary = await agent.run(summary_prompt)
return f"Plan completed:\n\n{summary}"
system_message = Message.system_message(
"You are a planning assistant. Your task is to summarize the completed plan."
)
user_message = Message.user_message(
f"The plan has been completed. Here is the final plan status:\n\n{plan_text}\n\nPlease provide a summary of what was accomplished and any final thoughts."
)
response = await self.llm.ask(
messages=[user_message], system_msgs=[system_message]
)
return f"Plan completed:\n\n{response}"
except Exception as e:
logger.error(f"Error finalizing plan: {e}")
return "Plan completed. Error generating summary."
logger.error(f"Error finalizing plan with LLM: {e}")
# Fallback to using an agent for the summary
try:
agent = self.primary_agent
summary_prompt = f"""
The plan has been completed. Here is the final plan status:
{plan_text}
Please provide a summary of what was accomplished and any final thoughts.
"""
summary = await agent.run(summary_prompt)
return f"Plan completed:\n\n{summary}"
except Exception as e2:
logger.error(f"Error finalizing plan with agent: {e2}")
return "Plan completed. Error generating summary."

View File

@ -66,10 +66,9 @@ class PlanningTool(BaseTool):
"additionalProperties": False,
}
_plans = {} # Dictionary to store plans by plan_id
_current_plan_id = None # Track the current active plan
plans: dict = {} # Dictionary to store plans by plan_id
_current_plan_id: Optional[str] = None # Track the current active plan
# async def __call__(
async def execute(
self,
*,
@ -125,7 +124,7 @@ class PlanningTool(BaseTool):
if not plan_id:
raise ToolError("Parameter `plan_id` is required for command: create")
if plan_id in self._plans:
if plan_id in self.plans:
raise ToolError(
f"A plan with ID '{plan_id}' already exists. Use 'update' to modify existing plans."
)
@ -151,7 +150,7 @@ class PlanningTool(BaseTool):
"step_notes": [""] * len(steps),
}
self._plans[plan_id] = plan
self.plans[plan_id] = plan
self._current_plan_id = plan_id # Set as active plan
return ToolResult(
@ -165,10 +164,10 @@ class PlanningTool(BaseTool):
if not plan_id:
raise ToolError("Parameter `plan_id` is required for command: update")
if plan_id not in self._plans:
if plan_id not in self.plans:
raise ToolError(f"No plan found with ID: {plan_id}")
plan = self._plans[plan_id]
plan = self.plans[plan_id]
if title:
plan["title"] = title
@ -209,13 +208,13 @@ class PlanningTool(BaseTool):
def _list_plans(self) -> ToolResult:
"""List all available plans."""
if not self._plans:
if not self.plans:
return ToolResult(
output="No plans available. Create a plan with the 'create' command."
)
output = "Available plans:\n"
for plan_id, plan in self._plans.items():
for plan_id, plan in self.plans.items():
current_marker = " (active)" if plan_id == self._current_plan_id else ""
completed = sum(
1 for status in plan["step_statuses"] if status == "completed"
@ -236,10 +235,10 @@ class PlanningTool(BaseTool):
)
plan_id = self._current_plan_id
if plan_id not in self._plans:
if plan_id not in self.plans:
raise ToolError(f"No plan found with ID: {plan_id}")
plan = self._plans[plan_id]
plan = self.plans[plan_id]
return ToolResult(output=self._format_plan(plan))
def _set_active_plan(self, plan_id: Optional[str]) -> ToolResult:
@ -247,12 +246,12 @@ class PlanningTool(BaseTool):
if not plan_id:
raise ToolError("Parameter `plan_id` is required for command: set_active")
if plan_id not in self._plans:
if plan_id not in self.plans:
raise ToolError(f"No plan found with ID: {plan_id}")
self._current_plan_id = plan_id
return ToolResult(
output=f"Plan '{plan_id}' is now the active plan.\n\n{self._format_plan(self._plans[plan_id])}"
output=f"Plan '{plan_id}' is now the active plan.\n\n{self._format_plan(self.plans[plan_id])}"
)
def _mark_step(
@ -271,13 +270,13 @@ class PlanningTool(BaseTool):
)
plan_id = self._current_plan_id
if plan_id not in self._plans:
if plan_id not in self.plans:
raise ToolError(f"No plan found with ID: {plan_id}")
if step_index is None:
raise ToolError("Parameter `step_index` is required for command: mark_step")
plan = self._plans[plan_id]
plan = self.plans[plan_id]
if step_index < 0 or step_index >= len(plan["steps"]):
raise ToolError(
@ -309,10 +308,10 @@ class PlanningTool(BaseTool):
if not plan_id:
raise ToolError("Parameter `plan_id` is required for command: delete")
if plan_id not in self._plans:
if plan_id not in self.plans:
raise ToolError(f"No plan found with ID: {plan_id}")
del self._plans[plan_id]
del self.plans[plan_id]
# If the deleted plan was the active plan, clear the active plan
if self._current_plan_id == plan_id: