92 lines
2.8 KiB
Python
92 lines
2.8 KiB
Python
from abc import ABC, abstractmethod
|
|
from enum import Enum
|
|
from typing import Dict, List, Optional, Union
|
|
|
|
from pydantic import BaseModel
|
|
|
|
from app.agent.base import BaseAgent
|
|
|
|
|
|
class FlowType(str, Enum):
|
|
PLANNING = "planning"
|
|
|
|
|
|
class BaseFlow(BaseModel, ABC):
|
|
"""Base class for execution flows supporting multiple agents"""
|
|
|
|
agents: Dict[str, BaseAgent]
|
|
tools: Optional[List] = None
|
|
primary_agent_key: Optional[str] = None
|
|
|
|
class Config:
|
|
arbitrary_types_allowed = True
|
|
|
|
def __init__(
|
|
self, agents: Union[BaseAgent, List[BaseAgent], Dict[str, BaseAgent]], **data
|
|
):
|
|
# Handle different ways of providing agents
|
|
if isinstance(agents, BaseAgent):
|
|
agents_dict = {"default": agents}
|
|
elif isinstance(agents, list):
|
|
agents_dict = {f"agent_{i}": agent for i, agent in enumerate(agents)}
|
|
else:
|
|
agents_dict = agents
|
|
|
|
# If primary agent not specified, use first agent
|
|
primary_key = data.get("primary_agent_key")
|
|
if not primary_key and agents_dict:
|
|
primary_key = next(iter(agents_dict))
|
|
data["primary_agent_key"] = primary_key
|
|
|
|
# Set the agents dictionary
|
|
data["agents"] = agents_dict
|
|
|
|
# Initialize using BaseModel's init
|
|
super().__init__(**data)
|
|
|
|
@property
|
|
def primary_agent(self) -> Optional[BaseAgent]:
|
|
"""Get the primary agent for the flow"""
|
|
return self.agents.get(self.primary_agent_key)
|
|
|
|
def get_agent(self, key: str) -> Optional[BaseAgent]:
|
|
"""Get a specific agent by key"""
|
|
return self.agents.get(key)
|
|
|
|
def add_agent(self, key: str, agent: BaseAgent) -> None:
|
|
"""Add a new agent to the flow"""
|
|
self.agents[key] = agent
|
|
|
|
@abstractmethod
|
|
async def execute(self, input_text: str) -> str:
|
|
"""Execute the flow with given input"""
|
|
|
|
|
|
class PlanStepStatus(str, Enum):
|
|
"""Enum class defining possible statuses of a plan step"""
|
|
|
|
NOT_STARTED = "not_started"
|
|
IN_PROGRESS = "in_progress"
|
|
COMPLETED = "completed"
|
|
BLOCKED = "blocked"
|
|
|
|
@classmethod
|
|
def get_all_statuses(cls) -> list[str]:
|
|
"""Return a list of all possible step status values"""
|
|
return [status.value for status in cls]
|
|
|
|
@classmethod
|
|
def get_active_statuses(cls) -> list[str]:
|
|
"""Return a list of values representing active statuses (not started or in progress)"""
|
|
return [cls.NOT_STARTED.value, cls.IN_PROGRESS.value]
|
|
|
|
@classmethod
|
|
def get_status_marks(cls) -> Dict[str, str]:
|
|
"""Return a mapping of statuses to their marker symbols"""
|
|
return {
|
|
cls.COMPLETED.value: "[✓]",
|
|
cls.IN_PROGRESS.value: "[→]",
|
|
cls.BLOCKED.value: "[!]",
|
|
cls.NOT_STARTED.value: "[ ]",
|
|
}
|