MCP HubMCP Hub
스킬 목록으로 돌아가기

implement-a2a-server

pjt222
업데이트됨 2 days ago
3 조회
17
2
17
GitHub에서 보기
메타aiautomationdesign

정보

이 스킬은 A2A 프로토콜을 위한 JSON-RPC 2.0 서버를 구현하여 완전한 작업 생명주기 관리와 SSE 스트리밍을 제공합니다. 다중 에이전트 워크플로에서 상호 운용이 필요한 에이전트를 구축하거나 기존 서비스에 A2A 지원을 추가할 때 사용하세요. 에이전트 카드의 백엔드를 생성하거나 A2A 표준 준수를 보장하는 데 이상적입니다.

빠른 설치

Claude Code

추천
기본
npx skills add pjt222/agent-almanac -a claude-code
플러그인 명령대체
/plugin add https://github.com/pjt222/agent-almanac
Git 클론대체
git clone https://github.com/pjt222/agent-almanac.git ~/.claude/skills/implement-a2a-server

Claude Code에서 이 명령을 복사하여 붙여넣어 스킬을 설치하세요

문서

建 A2A 伺服器

建全合規之 A2A 伺服器,處理 JSON-RPC 2.0 請求、管任務生命週期狀態、支即時更新之 SSE 串流,並供 Agent Card 以利探索。

適用時機

  • 建參與多代理 A2A 工作流程之代理
  • 為以 design-a2a-agent-card 設計之 Agent Card 建後端
  • 為現有代理或服務加 A2A 協定支援
  • 為測試建參考 A2A 伺服器實作
  • 部署須與他 A2A 合規代理互通之代理

輸入

  • 必要:定代理技能與能力之 Agent Card(JSON)
  • 必要:實作語言(TypeScript/Node.js 或 Python)
  • 必要:Agent Card 定義之每技能之任務執行邏輯
  • 選擇性:推送通知 webhook 支援(truefalse
  • 選擇性:持久任務儲存(記憶體、Redis、PostgreSQL)
  • 選擇性:配 Agent Card 驗證機制之驗證中介軟體
  • 選擇性:最大並發任務限

步驟

步驟一:以 JSON-RPC 2.0 處理器設專案

1.1. 以 HTTP 伺服器與 JSON-RPC 解析初始化專案:

TypeScript:

mkdir -p $PROJECT_NAME && cd $PROJECT_NAME
npm init -y
npm install express uuid
npm install -D typescript @types/node @types/express tsx

Python:

mkdir -p $PROJECT_NAME && cd $PROJECT_NAME
python -m venv .venv && source .venv/bin/activate
pip install fastapi uvicorn uuid6

1.2. 建 JSON-RPC 2.0 請求處理器:

interface JsonRpcRequest {
  jsonrpc: "2.0";
  id: string | number;
  method: string;
  params?: Record<string, unknown>;
}

interface JsonRpcResponse {
  jsonrpc: "2.0";
  id: string | number;
  result?: unknown;
  error?: { code: number; message: string; data?: unknown };
}

function handleJsonRpc(request: JsonRpcRequest): JsonRpcResponse {
  switch (request.method) {
    case "tasks/send":
      return handleTaskSend(request);
    case "tasks/get":
      return handleTaskGet(request);
    case "tasks/cancel":
      return handleTaskCancel(request);
    case "tasks/sendSubscribe":
      // Handled separately via SSE
      throw new Error("Use SSE endpoint for sendSubscribe");
    default:
      return {
        jsonrpc: "2.0",
        id: request.id,
        error: { code: -32601, message: `Method not found: ${request.method}` },
      };
  }
}

1.3. 將 JSON-RPC 處理器掛於 POST 端點(通常 /):

app.post("/", (req, res) => {
  const response = handleJsonRpc(req.body);
  res.json(response);
});

1.4. 於 /.well-known/agent.json 供 Agent Card:

app.get("/.well-known/agent.json", (req, res) => {
  res.json(agentCard);
});

預期: HTTP 伺服器接受 JSON-RPC 2.0 請求並供 Agent Card。

失敗時: 若 JSON-RPC 解析失敗,驗請求體有 jsonrpcmethodid 欄。對畸形 JSON 返 -32700(解析錯),對缺必要欄返 -32600(無效請求)。

步驟二:實任務狀態機

2.1. 定含所有 A2A 生命週期狀態之任務模型:

type TaskState =
  | "submitted"
  | "working"
  | "input-required"
  | "completed"
  | "failed"
  | "canceled";

interface Task {
  id: string;
  sessionId: string;
  status: {
    state: TaskState;
    message?: Message;
    timestamp: string;
  };
  history?: TaskStatus[];
  artifacts?: Artifact[];
  metadata?: Record<string, unknown>;
}

interface Message {
  role: "user" | "agent";
  parts: Part[];
}

type Part =
  | { type: "text"; text: string }
  | { type: "file"; file: { name: string; mimeType: string; bytes?: string; uri?: string } }
  | { type: "data"; data: Record<string, unknown> };

2.2. 實狀態轉換規則:

submitted  -> working | failed | canceled
working    -> completed | failed | canceled | input-required
input-required -> working | failed | canceled
completed  -> (terminal)
failed     -> (terminal)
canceled   -> (terminal)

2.3. 建含 CRUD 操作之任務儲存:

class TaskStore {
  private tasks: Map<string, Task> = new Map();

  create(sessionId: string, message: Message): Task { ... }
  get(taskId: string): Task | undefined { ... }
  updateStatus(taskId: string, state: TaskState, message?: Message): Task { ... }
  addArtifact(taskId: string, artifact: Artifact): void { ... }
  cancel(taskId: string): Task { ... }
}

2.4. 若 Agent Card 啟 stateTransitionHistory,每狀態變附時間戳附加於任務之 history 陣列。

預期: 任務儲存強制有效狀態轉換並維歷史。

失敗時: 若試無效狀態轉換(如 completedworking),以碼 -32002 附描述訊息返 JSON-RPC 錯。永勿默忽無效轉換。

步驟三:加 tasks/send 與 tasks/get 方法

3.1. 實 tasks/send——提交任務之主方法:

function handleTaskSend(request: JsonRpcRequest): JsonRpcResponse {
  const { id: taskId, sessionId, message } = request.params as TaskSendParams;

  // Create or resume task
  let task = taskStore.get(taskId);
  if (!task) {
    task = taskStore.create(sessionId, message);
  } else if (task.status.state === "input-required") {
    taskStore.updateStatus(task.id, "working");
  }

  // Route to skill handler based on message content
  const skill = matchSkill(message);
  if (!skill) {
    taskStore.updateStatus(task.id, "failed", {
      role: "agent",
      parts: [{ type: "text", text: "No matching skill for this request." }],
    });
    return { jsonrpc: "2.0", id: request.id, result: taskStore.get(task.id) };
  }

  // Execute skill (async — task will transition to working, then completed/failed)
  executeSkill(skill, task, message).catch((error) => {
    taskStore.updateStatus(task.id, "failed", {
      role: "agent",
      parts: [{ type: "text", text: error.message }],
    });
  });

  return { jsonrpc: "2.0", id: request.id, result: taskStore.get(task.id) };
}

3.2. 實 tasks/get——取任務狀態與工件:

function handleTaskGet(request: JsonRpcRequest): JsonRpcResponse {
  const { id: taskId, historyLength } = request.params as TaskGetParams;
  const task = taskStore.get(taskId);

  if (!task) {
    return {
      jsonrpc: "2.0",
      id: request.id,
      error: { code: -32001, message: `Task not found: ${taskId}` },
    };
  }

  // Optionally trim history to requested length
  const result = historyLength !== undefined
    ? { ...task, history: task.history?.slice(-historyLength) }
    : task;

  return { jsonrpc: "2.0", id: request.id, result };
}

3.3. 實 tasks/cancel

function handleTaskCancel(request: JsonRpcRequest): JsonRpcResponse {
  const { id: taskId } = request.params as TaskCancelParams;
  try {
    const task = taskStore.cancel(taskId);
    return { jsonrpc: "2.0", id: request.id, result: task };
  } catch (error) {
    return {
      jsonrpc: "2.0",
      id: request.id,
      error: { code: -32002, message: (error as Error).message },
    };
  }
}

預期: 可用之 tasks/sendtasks/gettasks/cancel 方法,正確管任務生命週期。

失敗時: 若技能配對失敗,以描述訊息於 failed 態返任務。若任務儲存滿,返 -32003(資源耗盡)。

步驟四:為 tasks/sendSubscribe 實 SSE 串流

4.1. 建 SSE 端點以串流任務更新:

app.post("/subscribe", (req, res) => {
  const request = req.body as JsonRpcRequest;
  if (request.method !== "tasks/sendSubscribe") {
    res.status(400).json({ error: "Only tasks/sendSubscribe supported" });
    return;
  }

  // Set SSE headers
  res.setHeader("Content-Type", "text/event-stream");
  res.setHeader("Cache-Control", "no-cache");
  res.setHeader("Connection", "keep-alive");

  const { id: taskId, sessionId, message } = request.params as TaskSendParams;
  let task = taskStore.get(taskId) ?? taskStore.create(sessionId, message);

  // Send initial status
  sendSSEEvent(res, "status", {
    id: request.id,
    result: { id: task.id, status: task.status },
  });

  // Subscribe to task updates
  const unsubscribe = taskStore.onUpdate(task.id, (updatedTask) => {
    if (updatedTask.status.state === "working") {
      sendSSEEvent(res, "status", {
        id: request.id,
        result: { id: updatedTask.id, status: updatedTask.status },
      });
    }

    if (updatedTask.artifacts?.length) {
      sendSSEEvent(res, "artifact", {
        id: request.id,
        result: { id: updatedTask.id, artifact: updatedTask.artifacts.at(-1) },
      });
    }

    // Close stream on terminal states
    if (["completed", "failed", "canceled"].includes(updatedTask.status.state)) {
      sendSSEEvent(res, "status", {
        id: request.id,
        result: { id: updatedTask.id, status: updatedTask.status, final: true },
      });
      unsubscribe();
      res.end();
    }
  });

  // Handle client disconnect
  req.on("close", () => {
    unsubscribe();
  });
});

function sendSSEEvent(res: Response, event: string, data: unknown): void {
  res.write(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`);
}

4.2. 於任務儲存加事件發射器或 pub/sub 機制:

class TaskStore {
  private listeners: Map<string, Set<(task: Task) => void>> = new Map();

  onUpdate(taskId: string, callback: (task: Task) => void): () => void {
    if (!this.listeners.has(taskId)) {
      this.listeners.set(taskId, new Set());
    }
    this.listeners.get(taskId)!.add(callback);
    return () => this.listeners.get(taskId)?.delete(callback);
  }

  private notifyListeners(taskId: string): void {
    const task = this.get(taskId);
    if (task) {
      this.listeners.get(taskId)?.forEach((cb) => cb(task));
    }
  }
}

4.3. 自所有任務狀態轉換與工件增加發事件。

預期: SSE 串流於任務進展時發即時狀態與工件事件。

失敗時: 若 SSE 連線斷,客戶端應可重連並用 tasks/get 取當前狀態。確任務儲存不依活動 SSE 連線。

步驟五:加推送通知 Webhook 支援

5.1. 若 Agent Card 啟 pushNotifications,經 tasks/pushNotification/set 實 webhook 註冊:

  • PushNotificationConfigurl(必 HTTPS)、選擇性 tokenevents 陣列(["status", "artifact"]
  • 驗 webhook URL 用 HTTPS;否則以錯碼 -32004 拒之
  • 儲存配置於任務儲存,以任務 ID 為鍵

5.2. 於任務狀態變時發 webhook 回呼:

  • 每狀態轉換或工件增加時核是否有註冊之推送配置
  • POST 含 taskIdeventTypestatustimestamp 之 JSON 負載至 webhook URL
  • 若供 token 則納 Authorization: Bearer <token> 標頭

5.3. 為失敗之 webhook 實重試邏輯(指數退避,最多三次重試)。

5.4. 加 tasks/pushNotification/get 以取任務當前推送配置。

預期: Webhook 註冊與送達附重試邏輯。

失敗時: 推送通知失敗不得影響任務執行。記錯並續。若 webhook URL 持續不可達,最大重試後移訂閱。

步驟六:與 Agent Card 整合以利探索

6.1. 啟動時載並供 Agent Card:

  • 解析 agent-card.json 並驗能力配實作
  • 若卡宣 streaming: true 而 SSE 未啟,啟動時拋錯
  • 若卡宣 pushNotifications: true 而 webhook 未啟,啟動時拋錯

6.2. 為跨源 Agent Card 探索加 CORS 標頭:

  • /.well-known/agent.jsonAccess-Control-Allow-Origin: *
  • GETOPTIONS 方法

6.3. 加配 Agent Card 機制之驗證中介軟體:

  • /.well-known/agent.json 之驗證(Agent Card 永為公開)
  • 所有他端點驗 Authorization 標頭或 API 金鑰
  • 未授權請求返 HTTP 401 附 JSON-RPC 錯碼 -32000

6.4. 啟動伺服器並端到端驗證:

# Start server
npm run dev

# Fetch Agent Card
curl -s http://localhost:3000/.well-known/agent.json | python3 -m json.tool

# Send a task
curl -X POST http://localhost:3000/ \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","id":1,"method":"tasks/send","params":{"id":"task-1","sessionId":"session-1","message":{"role":"user","parts":[{"type":"text","text":"Analyze my dataset"}]}}}'

預期: 執行中之 A2A 伺服器供其 Agent Card、接任務、管其全生命週期。

失敗時: 若 Agent Card 能力不配實作,6.1 之啟動驗證將捕此不配。修實作或更新 Agent Card 以配之。

驗證

  • 伺服器啟且於 /.well-known/agent.json 供 Agent Card
  • tasks/send 建任務並令其經生命週期轉換
  • tasks/get 取任務狀態與工件
  • tasks/cancel 將任務移至 canceled 態
  • SSE 串流發即時狀態與工件事件(若啟)
  • 推送通知於狀態變時送達 webhook(若啟)
  • 無效狀態轉換返合宜之 JSON-RPC 錯
  • 驗證拒未授權請求(若配置)
  • Agent Card 能力精確反映伺服器實作
  • 所有 JSON-RPC 響應含 jsonrpc: "2.0" 與正確 id

常見陷阱

  • 缺 JSON-RPC 錯碼:A2A 協定定特定錯碼。用 -32700(解析錯)、-32600(無效請求)、-32601(方法未覓)與領域錯之自訂碼
  • 任務 ID 衝突:用 UUID 為任務 ID。若客戶端供 ID,建任務前驗唯一性
  • SSE 連線洩漏:客戶端斷時永清 SSE 訂閱。用 req.on("close") 偵斷
  • 阻塞技能執行:長時技能須異步執行。即刻以 submittedworking 態返任務,再經事件更新
  • Agent Card 漂移:若伺服器實作變而 Agent Card 未更新,客戶端將有錯期望。啟動時驗
  • 忽視終態:任務至 completedfailedcanceled 後不允進一步狀態轉換。於狀態機中守之

相關技能

  • design-a2a-agent-card - 設計此伺服器所實之 Agent Card
  • test-a2a-interop - 對 A2A 合規測試驗伺服器
  • build-custom-mcp-server - MCP 伺服器模式引導 A2A 實作
  • scaffold-mcp-server - 適用於 A2A 伺服器設置之骨架模式
  • configure-ingress-networking - 附 TLS 與路由之生產部署

GitHub 저장소

pjt222/agent-almanac
경로: i18n/wenyan-lite/skills/implement-a2a-server
0
agentsagentskillsai-assisted-developmentclaude-codeskillsteams

연관 스킬

content-collections

메타

이 스킬은 콘텐츠 콜렉션(Content Collections)을 위한 프로덕션 검증된 설정을 제공합니다. 콘텐츠 콜렉션은 Markdown/MDX 파일을 Zod 검증이 포함된 타입 안전한 데이터 콜렉션으로 변환해주는 TypeScript 최우선 도구입니다. 블로그, 문서 사이트 또는 콘텐츠 중심의 Vite + React 애플리케이션을 구축할 때 타입 안전성과 자동 콘텐츠 검증을 보장하기 위해 사용하세요. Vite 플러그인 구성과 MDX 컴파일부터 배포 최적화 및 스키마 검증에 이르기까지 모든 것을 다룹니다.

스킬 보기

polymarket

메타

이 스킬은 개발자들이 Polymarket 예측 시장 플랫폼을 활용한 애플리케이션을 구축할 수 있도록 지원하며, 거래 및 시장 데이터를 위한 API 통합 기능을 포함합니다. 또한 WebSocket을 통한 실시간 데이터 스트리밍을 제공하여 실시간 거래와 시장 활동을 모니터링할 수 있습니다. 이를 통해 거래 전략을 구현하거나 실시간 시장 업데이트를 처리하는 도구를 생성하는 데 활용할 수 있습니다.

스킬 보기

creating-opencode-plugins

메타

이 스킬은 개발자들이 명령어, 파일, LSP 작업 등 25개 이상의 이벤트 유형에 연결되는 OpenCode 플러그인을 만들 수 있도록 돕습니다. JavaScript/TypeScript 모듈을 위한 플러그인 구조, 이벤트 API 명세, 구현 패턴을 제공합니다. OpenCode AI 어시스턴트의 라이프사이클을 사용자 정의 이벤트 기반 로직으로 가로채거나, 모니터링하거나, 확장해야 할 때 사용하세요.

스킬 보기

sglang

메타

SGLang은 RadixAttention 프리픽스 캐싱을 활용하여 JSON, 정규식, 에이전트 워크플로우를 위한 고속 구조화 생성에 특화된 고성능 LLM 서빙 프레임워크입니다. 특히 반복되는 프리픽스가 있는 작업에서 상당히 빠른 추론 속도를 제공하여 복잡한 구조화 출력 및 다중 턴 대화에 이상적입니다. 제약 디코딩이 필요하거나 광범위한 프리픽스 공유가 있는 애플리케이션을 구축할 때는 vLLM과 같은 대안보다 SGLang을 선택하십시오.

스킬 보기