349 lines
11 KiB
Python
349 lines
11 KiB
Python
import asyncio
|
|
import os
|
|
import threading
|
|
import tomllib
|
|
import uuid
|
|
import webbrowser
|
|
from datetime import datetime
|
|
from functools import partial
|
|
from json import dumps
|
|
from pathlib import Path
|
|
|
|
from fastapi import Body, FastAPI, HTTPException, Request
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import (
|
|
FileResponse,
|
|
HTMLResponse,
|
|
JSONResponse,
|
|
StreamingResponse,
|
|
)
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.templating import Jinja2Templates
|
|
from pydantic import BaseModel
|
|
|
|
|
|
app = FastAPI()
|
|
|
|
app.mount("/static", StaticFiles(directory="static"), name="static")
|
|
templates = Jinja2Templates(directory="templates")
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
|
|
class Task(BaseModel):
|
|
id: str
|
|
prompt: str
|
|
created_at: datetime
|
|
status: str
|
|
steps: list = []
|
|
|
|
def model_dump(self, *args, **kwargs):
|
|
data = super().model_dump(*args, **kwargs)
|
|
data["created_at"] = self.created_at.isoformat()
|
|
return data
|
|
|
|
|
|
class TaskManager:
|
|
def __init__(self):
|
|
self.tasks = {}
|
|
self.queues = {}
|
|
|
|
def create_task(self, prompt: str) -> Task:
|
|
task_id = str(uuid.uuid4())
|
|
task = Task(
|
|
id=task_id, prompt=prompt, created_at=datetime.now(), status="pending"
|
|
)
|
|
self.tasks[task_id] = task
|
|
self.queues[task_id] = asyncio.Queue()
|
|
return task
|
|
|
|
async def update_task_step(
|
|
self, task_id: str, step: int, result: str, step_type: str = "step"
|
|
):
|
|
if task_id in self.tasks:
|
|
task = self.tasks[task_id]
|
|
task.steps.append({"step": step, "result": result, "type": step_type})
|
|
await self.queues[task_id].put(
|
|
{"type": step_type, "step": step, "result": result}
|
|
)
|
|
await self.queues[task_id].put(
|
|
{"type": "status", "status": task.status, "steps": task.steps}
|
|
)
|
|
|
|
async def complete_task(self, task_id: str):
|
|
if task_id in self.tasks:
|
|
task = self.tasks[task_id]
|
|
task.status = "completed"
|
|
await self.queues[task_id].put(
|
|
{"type": "status", "status": task.status, "steps": task.steps}
|
|
)
|
|
await self.queues[task_id].put({"type": "complete"})
|
|
|
|
async def fail_task(self, task_id: str, error: str):
|
|
if task_id in self.tasks:
|
|
self.tasks[task_id].status = f"failed: {error}"
|
|
await self.queues[task_id].put({"type": "error", "message": error})
|
|
|
|
|
|
task_manager = TaskManager()
|
|
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
async def index(request: Request):
|
|
return templates.TemplateResponse("index.html", {"request": request})
|
|
|
|
|
|
@app.get("/download")
|
|
async def download_file(file_path: str):
|
|
if not os.path.exists(file_path):
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
|
|
return FileResponse(file_path, filename=os.path.basename(file_path))
|
|
|
|
|
|
@app.post("/tasks")
|
|
async def create_task(prompt: str = Body(..., embed=True)):
|
|
task = task_manager.create_task(prompt)
|
|
asyncio.create_task(run_task(task.id, prompt))
|
|
return {"task_id": task.id}
|
|
|
|
|
|
from app.agent.manus import Manus
|
|
|
|
|
|
async def run_task(task_id: str, prompt: str):
|
|
try:
|
|
task_manager.tasks[task_id].status = "running"
|
|
|
|
agent = Manus(
|
|
name="Manus",
|
|
description="A versatile agent that can solve various tasks using multiple tools",
|
|
)
|
|
|
|
async def on_think(thought):
|
|
await task_manager.update_task_step(task_id, 0, thought, "think")
|
|
|
|
async def on_tool_execute(tool, input):
|
|
await task_manager.update_task_step(
|
|
task_id, 0, f"Executing tool: {tool}\nInput: {input}", "tool"
|
|
)
|
|
|
|
async def on_action(action):
|
|
await task_manager.update_task_step(
|
|
task_id, 0, f"Executing action: {action}", "act"
|
|
)
|
|
|
|
async def on_run(step, result):
|
|
await task_manager.update_task_step(task_id, step, result, "run")
|
|
|
|
from app.logger import logger
|
|
|
|
class SSELogHandler:
|
|
def __init__(self, task_id):
|
|
self.task_id = task_id
|
|
|
|
async def __call__(self, message):
|
|
import re
|
|
|
|
# Extract - Subsequent Content
|
|
cleaned_message = re.sub(r"^.*? - ", "", message)
|
|
|
|
event_type = "log"
|
|
if "✨ Manus's thoughts:" in cleaned_message:
|
|
event_type = "think"
|
|
elif "🛠️ Manus selected" in cleaned_message:
|
|
event_type = "tool"
|
|
elif "🎯 Tool" in cleaned_message:
|
|
event_type = "act"
|
|
elif "📝 Oops!" in cleaned_message:
|
|
event_type = "error"
|
|
elif "🏁 Special tool" in cleaned_message:
|
|
event_type = "complete"
|
|
|
|
await task_manager.update_task_step(
|
|
self.task_id, 0, cleaned_message, event_type
|
|
)
|
|
|
|
sse_handler = SSELogHandler(task_id)
|
|
logger.add(sse_handler)
|
|
|
|
result = await agent.run(prompt)
|
|
await task_manager.update_task_step(task_id, 1, result, "result")
|
|
await task_manager.complete_task(task_id)
|
|
except Exception as e:
|
|
await task_manager.fail_task(task_id, str(e))
|
|
|
|
|
|
@app.get("/tasks/{task_id}/events")
|
|
async def task_events(task_id: str):
|
|
async def event_generator():
|
|
if task_id not in task_manager.queues:
|
|
yield f"event: error\ndata: {dumps({'message': 'Task not found'})}\n\n"
|
|
return
|
|
|
|
queue = task_manager.queues[task_id]
|
|
|
|
task = task_manager.tasks.get(task_id)
|
|
if task:
|
|
yield f"event: status\ndata: {dumps({'type': 'status', 'status': task.status, 'steps': task.steps})}\n\n"
|
|
|
|
while True:
|
|
try:
|
|
event = await queue.get()
|
|
formatted_event = dumps(event)
|
|
|
|
yield ": heartbeat\n\n"
|
|
|
|
if event["type"] == "complete":
|
|
yield f"event: complete\ndata: {formatted_event}\n\n"
|
|
break
|
|
elif event["type"] == "error":
|
|
yield f"event: error\ndata: {formatted_event}\n\n"
|
|
break
|
|
elif event["type"] == "step":
|
|
task = task_manager.tasks.get(task_id)
|
|
if task:
|
|
yield f"event: status\ndata: {dumps({'type': 'status', 'status': task.status, 'steps': task.steps})}\n\n"
|
|
yield f"event: {event['type']}\ndata: {formatted_event}\n\n"
|
|
elif event["type"] in ["think", "tool", "act", "run"]:
|
|
yield f"event: {event['type']}\ndata: {formatted_event}\n\n"
|
|
else:
|
|
yield f"event: {event['type']}\ndata: {formatted_event}\n\n"
|
|
|
|
except asyncio.CancelledError:
|
|
print(f"Client disconnected for task {task_id}")
|
|
break
|
|
except Exception as e:
|
|
print(f"Error in event stream: {str(e)}")
|
|
yield f"event: error\ndata: {dumps({'message': str(e)})}\n\n"
|
|
break
|
|
|
|
return StreamingResponse(
|
|
event_generator(),
|
|
media_type="text/event-stream",
|
|
headers={
|
|
"Cache-Control": "no-cache",
|
|
"Connection": "keep-alive",
|
|
"X-Accel-Buffering": "no",
|
|
},
|
|
)
|
|
|
|
|
|
@app.get("/tasks")
|
|
async def get_tasks():
|
|
sorted_tasks = sorted(
|
|
task_manager.tasks.values(), key=lambda task: task.created_at, reverse=True
|
|
)
|
|
return JSONResponse(
|
|
content=[task.model_dump() for task in sorted_tasks],
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
|
|
|
|
@app.get("/tasks/{task_id}")
|
|
async def get_task(task_id: str):
|
|
if task_id not in task_manager.tasks:
|
|
raise HTTPException(status_code=404, detail="Task not found")
|
|
return task_manager.tasks[task_id]
|
|
|
|
|
|
@app.get("/config/status")
|
|
async def check_config_status():
|
|
config_path = Path(__file__).parent / "config" / "config.toml"
|
|
example_config_path = Path(__file__).parent / "config" / "config.example.toml"
|
|
|
|
if config_path.exists():
|
|
return {"status": "exists"}
|
|
elif example_config_path.exists():
|
|
try:
|
|
with open(example_config_path, "rb") as f:
|
|
example_config = tomllib.load(f)
|
|
return {"status": "missing", "example_config": example_config}
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|
|
else:
|
|
return {"status": "no_example"}
|
|
|
|
|
|
@app.post("/config/save")
|
|
async def save_config(config_data: dict = Body(...)):
|
|
try:
|
|
config_dir = Path(__file__).parent / "config"
|
|
config_dir.mkdir(exist_ok=True)
|
|
|
|
config_path = config_dir / "config.toml"
|
|
|
|
toml_content = ""
|
|
|
|
if "llm" in config_data:
|
|
toml_content += "# Global LLM configuration\n[llm]\n"
|
|
llm_config = config_data["llm"]
|
|
for key, value in llm_config.items():
|
|
if key != "vision":
|
|
if isinstance(value, str):
|
|
toml_content += f'{key} = "{value}"\n'
|
|
else:
|
|
toml_content += f"{key} = {value}\n"
|
|
|
|
if "server" in config_data:
|
|
toml_content += "\n# Server configuration\n[server]\n"
|
|
server_config = config_data["server"]
|
|
for key, value in server_config.items():
|
|
if isinstance(value, str):
|
|
toml_content += f'{key} = "{value}"\n'
|
|
else:
|
|
toml_content += f"{key} = {value}\n"
|
|
|
|
with open(config_path, "w", encoding="utf-8") as f:
|
|
f.write(toml_content)
|
|
|
|
return {"status": "success"}
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
|
|
@app.exception_handler(Exception)
|
|
async def generic_exception_handler(request: Request, exc: Exception):
|
|
return JSONResponse(
|
|
status_code=500, content={"message": f"Server error: {str(exc)}"}
|
|
)
|
|
|
|
|
|
def open_local_browser(config):
|
|
webbrowser.open_new_tab(f"http://{config['host']}:{config['port']}")
|
|
|
|
|
|
def load_config():
|
|
try:
|
|
config_path = Path(__file__).parent / "config" / "config.toml"
|
|
|
|
if not config_path.exists():
|
|
return {"host": "localhost", "port": 5172}
|
|
|
|
with open(config_path, "rb") as f:
|
|
config = tomllib.load(f)
|
|
|
|
return {"host": config["server"]["host"], "port": config["server"]["port"]}
|
|
except FileNotFoundError:
|
|
return {"host": "localhost", "port": 5172}
|
|
except KeyError as e:
|
|
print(
|
|
f"The configuration file is missing necessary fields: {str(e)}, use default configuration"
|
|
)
|
|
return {"host": "localhost", "port": 5172}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
|
|
config = load_config()
|
|
open_with_config = partial(open_local_browser, config)
|
|
threading.Timer(3, open_with_config).start()
|
|
uvicorn.run(app, host=config["host"], port=config["port"])
|