1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
| from fastapi import FastAPI, HTTPException from pydantic import BaseModel from typing import List, Optional, Dict, Any from enum import Enum import uuid from datetime import datetime
app = FastAPI()
class TaskState(Enum): SUBMITTED = "submitted" WORKING = "working" INPUT_REQUIRED = "input_required" COMPLETED = "completed" CANCELED = "canceled" FAILED = "failed"
class Part(BaseModel): type: str text: Optional[str] = None data: Optional[Dict[str, Any]] = None
class Message(BaseModel): role: str parts: List[Part]
class TaskStatus(BaseModel): state: TaskState message: Optional[str] = None timestamp: str = None
class Task(BaseModel): id: str sessionId: Optional[str] = None status: TaskStatus artifacts: List[Dict] = [] history: List[Message] = []
class SendTaskRequest(BaseModel): id: Optional[str] = None sessionId: Optional[str] = None message: Message acceptedOutputModes: List[str] = ["text/plain"]
tasks: Dict[str, Task] = {}
@app.get("/.well-known/agent.json") async def get_agent_card(): return { "name": "WeatherAgent", "description": "提供全球天气查询服务", "url": "https://weather-agent.example.com", "version": "1.0.0", "capabilities": { "streaming": True, "pushNotifications": False }, "skills": [ { "id": "get_weather", "name": "查询天气", "description": "查询指定城市的当前天气和预报", "examples": [ "北京今天天气怎么样", "查询上海未来三天的天气" ] } ] }
@app.post("/tasks/send") async def send_task(request: SendTaskRequest): task_id = request.id or str(uuid.uuid4())
if task_id in tasks: task = tasks[task_id] task.history.append(request.message) else: task = Task( id=task_id, sessionId=request.sessionId or str(uuid.uuid4()), status=TaskStatus( state=TaskState.SUBMITTED, timestamp=datetime.now().isoformat() ), history=[request.message] ) tasks[task_id] = task
result = await process_task(task, request.message)
return { "id": task.id, "sessionId": task.sessionId, "status": task.status.dict(), "artifacts": task.artifacts }
async def process_task(task: Task, message: Message) -> Task: """处理任务逻辑""" task.status = TaskStatus( state=TaskState.WORKING, message="正在查询天气信息...", timestamp=datetime.now().isoformat() )
user_text = "" for part in message.parts: if part.type == "text" or part.type == "text/plain": user_text = part.text break
weather_result = get_weather_data(user_text)
task.status = TaskStatus( state=TaskState.COMPLETED, timestamp=datetime.now().isoformat() )
task.artifacts = [{ "name": "weather_info", "parts": [{ "type": "text/plain", "text": weather_result }] }]
task.history.append(Message( role="agent", parts=[Part(type="text", text=weather_result)] ))
return task
def get_weather_data(query: str) -> str: """模拟天气数据获取""" return f"根据您的查询'{query}',北京今天晴,气温15-23°C,适合外出。"
@app.get("/tasks/{task_id}") async def get_task(task_id: str): if task_id not in tasks: raise HTTPException(status_code=404, detail="Task not found") return tasks[task_id]
@app.post("/tasks/{task_id}/cancel") async def cancel_task(task_id: str): if task_id not in tasks: raise HTTPException(status_code=404, detail="Task not found")
task = tasks[task_id] task.status = TaskStatus( state=TaskState.CANCELED, message="任务已取消", timestamp=datetime.now().isoformat() ) return {"status": "canceled"}
|