2025-03-21 17:37:16 +08:00

58 lines
1.8 KiB
Python

from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Union
from pydantic import BaseModel
from app.agent.base import BaseAgent
class BaseFlow(BaseModel, ABC):
"""Base class for execution flows supporting multiple agents"""
agents: Dict[str, BaseAgent]
tools: Optional[List] = None
primary_agent_key: Optional[str] = None
class Config:
arbitrary_types_allowed = True
def __init__(
self, agents: Union[BaseAgent, List[BaseAgent], Dict[str, BaseAgent]], **data
):
# Handle different ways of providing agents
if isinstance(agents, BaseAgent):
agents_dict = {"default": agents}
elif isinstance(agents, list):
agents_dict = {f"agent_{i}": agent for i, agent in enumerate(agents)}
else:
agents_dict = agents
# If primary agent not specified, use first agent
primary_key = data.get("primary_agent_key")
if not primary_key and agents_dict:
primary_key = next(iter(agents_dict))
data["primary_agent_key"] = primary_key
# Set the agents dictionary
data["agents"] = agents_dict
# Initialize using BaseModel's init
super().__init__(**data)
@property
def primary_agent(self) -> Optional[BaseAgent]:
"""Get the primary agent for the flow"""
return self.agents.get(self.primary_agent_key)
def get_agent(self, key: str) -> Optional[BaseAgent]:
"""Get a specific agent by key"""
return self.agents.get(key)
def add_agent(self, key: str, agent: BaseAgent) -> None:
"""Add a new agent to the flow"""
self.agents[key] = agent
@abstractmethod
async def execute(self, input_text: str) -> str:
"""Execute the flow with given input"""