From de5ca60cf70de8b41ab5eba2b9d6222a7609782a Mon Sep 17 00:00:00 2001 From: gantnocap Date: Tue, 11 Mar 2025 13:21:07 +0800 Subject: [PATCH] fix format --- app.py | 100 +++++++++++++++++++++++++++++---------------------------- 1 file changed, 51 insertions(+), 49 deletions(-) diff --git a/app.py b/app.py index 7fcac28..0f7425b 100644 --- a/app.py +++ b/app.py @@ -24,6 +24,7 @@ app.add_middleware( allow_headers=["*"], ) + class Task(BaseModel): id: str prompt: str @@ -33,9 +34,10 @@ class Task(BaseModel): def model_dump(self, *args, **kwargs): data = super().model_dump(*args, **kwargs) - data['created_at'] = self.created_at.isoformat() + data["created_at"] = self.created_at.isoformat() return data + class TaskManager: def __init__(self): self.tasks = {} @@ -44,61 +46,55 @@ class TaskManager: def create_task(self, prompt: str) -> Task: task_id = str(uuid.uuid4()) task = Task( - id=task_id, - prompt=prompt, - created_at=datetime.now(), - status="pending" + id=task_id, prompt=prompt, created_at=datetime.now(), status="pending" ) self.tasks[task_id] = task self.queues[task_id] = asyncio.Queue() return task - async def update_task_step(self, task_id: str, step: int, result: str, step_type: str = "step"): + async def update_task_step( + self, task_id: str, step: int, result: str, step_type: str = "step" + ): if task_id in self.tasks: task = self.tasks[task_id] task.steps.append({"step": step, "result": result, "type": step_type}) - await self.queues[task_id].put({ - "type": step_type, - "step": step, - "result": result - }) - await self.queues[task_id].put({ - "type": "status", - "status": task.status, - "steps": task.steps - }) + await self.queues[task_id].put( + {"type": step_type, "step": step, "result": result} + ) + await self.queues[task_id].put( + {"type": "status", "status": task.status, "steps": task.steps} + ) async def complete_task(self, task_id: str): if task_id in self.tasks: task = self.tasks[task_id] task.status = "completed" - await self.queues[task_id].put({ - "type": "status", - "status": task.status, - "steps": task.steps - }) + await self.queues[task_id].put( + {"type": "status", "status": task.status, "steps": task.steps} + ) await self.queues[task_id].put({"type": "complete"}) async def fail_task(self, task_id: str, error: str): if task_id in self.tasks: self.tasks[task_id].status = f"failed: {error}" - await self.queues[task_id].put({ - "type": "error", - "message": error - }) + await self.queues[task_id].put({"type": "error", "message": error}) + task_manager = TaskManager() + @app.get("/", response_class=HTMLResponse) async def index(request: Request): return templates.TemplateResponse("index.html", {"request": request}) + @app.post("/tasks") async def create_task(prompt: str = Body(..., embed=True)): task = task_manager.create_task(prompt) asyncio.create_task(run_task(task.id, prompt)) return {"task_id": task.id} + from app.agent.manus import Manus @@ -109,17 +105,21 @@ async def run_task(task_id: str, prompt: str): agent = Manus( name="Manus", description="A versatile agent that can solve various tasks using multiple tools", - max_steps=30 + max_steps=30, ) async def on_think(thought): await task_manager.update_task_step(task_id, 0, thought, "think") async def on_tool_execute(tool, input): - await task_manager.update_task_step(task_id, 0, f"Executing tool: {tool}\nInput: {input}", "tool") + await task_manager.update_task_step( + task_id, 0, f"Executing tool: {tool}\nInput: {input}", "tool" + ) async def on_action(action): - await task_manager.update_task_step(task_id, 0, f"Executing action: {action}", "act") + await task_manager.update_task_step( + task_id, 0, f"Executing action: {action}", "act" + ) async def on_run(step, result): await task_manager.update_task_step(task_id, step, result, "run") @@ -134,7 +134,7 @@ async def run_task(task_id: str, prompt: str): import re # 提取 - 后面的内容 - cleaned_message = re.sub(r'^.*? - ', '', message) + cleaned_message = re.sub(r"^.*? - ", "", message) event_type = "log" if "✨ Manus's thoughts:" in cleaned_message: @@ -148,7 +148,9 @@ async def run_task(task_id: str, prompt: str): elif "🏁 Special tool" in cleaned_message: event_type = "complete" - await task_manager.update_task_step(self.task_id, 0, cleaned_message, event_type) + await task_manager.update_task_step( + self.task_id, 0, cleaned_message, event_type + ) sse_handler = SSELogHandler(task_id) logger.add(sse_handler) @@ -159,6 +161,7 @@ async def run_task(task_id: str, prompt: str): except Exception as e: await task_manager.fail_task(task_id, str(e)) + @app.get("/tasks/{task_id}/events") async def task_events(task_id: str): async def event_generator(): @@ -170,11 +173,8 @@ async def task_events(task_id: str): task = task_manager.tasks.get(task_id) if task: - yield f"event: status\ndata: {dumps({ - 'type': 'status', - 'status': task.status, - 'steps': task.steps - })}\n\n" + status_data = {"type": "status", "status": task.status, "steps": task.steps} + yield f"event: status\ndata: {dumps(status_data)}\n\n" while True: try: @@ -192,11 +192,12 @@ async def task_events(task_id: str): elif event["type"] == "step": task = task_manager.tasks.get(task_id) if task: - yield f"event: status\ndata: {dumps({ - 'type': 'status', - 'status': task.status, - 'steps': task.steps - })}\n\n" + status_data = { + "type": "status", + "status": task.status, + "steps": task.steps, + } + yield f"event: status\ndata: {dumps(status_data)}\n\n" yield f"event: {event['type']}\ndata: {formatted_event}\n\n" elif event["type"] in ["think", "tool", "act", "run"]: yield f"event: {event['type']}\ndata: {formatted_event}\n\n" @@ -217,36 +218,37 @@ async def task_events(task_id: str): headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", - "X-Accel-Buffering": "no" - } + "X-Accel-Buffering": "no", + }, ) + @app.get("/tasks") async def get_tasks(): sorted_tasks = sorted( - task_manager.tasks.values(), - key=lambda task: task.created_at, - reverse=True + task_manager.tasks.values(), key=lambda task: task.created_at, reverse=True ) return JSONResponse( content=[task.model_dump() for task in sorted_tasks], - headers={"Content-Type": "application/json"} + headers={"Content-Type": "application/json"}, ) + @app.get("/tasks/{task_id}") async def get_task(task_id: str): if task_id not in task_manager.tasks: raise HTTPException(status_code=404, detail="Task not found") return task_manager.tasks[task_id] + @app.exception_handler(Exception) async def generic_exception_handler(request: Request, exc: Exception): return JSONResponse( - status_code=500, - content={"message": f"Server error: {str(exc)}"} + status_code=500, content={"message": f"Server error: {str(exc)}"} ) + if __name__ == "__main__": import uvicorn - uvicorn.run(app, host="localhost", port=5172) + uvicorn.run(app, host="localhost", port=5172)