implement-a2a-server
À propos
Cette compétence implémente un serveur A2A JSON-RPC 2.0 avec une gestion complète du cycle de vie des tâches, du streaming SSE et des notifications push. Utilisez-la pour créer des backends d'agent pour des workflows multi-agents, ajouter la prise en charge du protocole A2A à des services existants, ou déployer des agents devant interopérer avec d'autres systèmes conformes à A2A.
Installation rapide
Claude Code
Recommandénpx skills add pjt222/agent-almanac -a claude-code/plugin add https://github.com/pjt222/agent-almanacgit clone https://github.com/pjt222/agent-almanac.git ~/.claude/skills/implement-a2a-serverCopiez et collez cette commande dans Claude Code pour installer cette compétence
Documentation
A2Aサーバーの実装
JSON-RPC 2.0リクエストを処理し、タスクのライフサイクル状態を管理し、リアルタイム更新のためのSSEストリーミングをサポートし、ディスカバリ用のAgent Cardを提供する完全準拠のA2Aサーバーを構築する。
使用タイミング
- マルチエージェントA2Aワークフローに参加するエージェントを実装する時
design-a2a-agent-cardで設計されたAgent Cardのバックエンドを構築する時- 既存のエージェントやサービスにA2Aプロトコルサポートを追加する時
- テスト用のリファレンスA2Aサーバー実装を作成する時
- 他のA2A準拠エージェントと相互運用する必要があるエージェントをデプロイする時
入力
- 必須: エージェントのスキルと機能を定義するAgent Card(JSON)
- 必須: 実装言語(TypeScript/Node.jsまたはPython)
- 必須: Agent Cardで定義された各スキルのタスク実行ロジック
- 任意: プッシュ通知Webhookサポート(
trueまたはfalse) - 任意: 永続的タスクストア(インメモリ、Redis、PostgreSQL)
- 任意: Agent Cardの認証スキームに合致する認証ミドルウェア
- 任意: 最大同時タスク数制限
手順
ステップ1: JSON-RPC 2.0ハンドラーを備えたプロジェクトのセットアップ
1.1. HTTPサーバーとJSON-RPCパーシングでプロジェクトを初期化する:
TypeScript:
mkdir -p $PROJECT_NAME && cd $PROJECT_NAME
npm init -y
npm install express uuid
npm install -D typescript @types/node @types/express tsx
Python:
mkdir -p $PROJECT_NAME && cd $PROJECT_NAME
python -m venv .venv && source .venv/bin/activate
pip install fastapi uvicorn uuid6
1.2. JSON-RPC 2.0リクエストハンドラーを作成する:
interface JsonRpcRequest {
jsonrpc: "2.0";
id: string | number;
method: string;
params?: Record<string, unknown>;
}
interface JsonRpcResponse {
jsonrpc: "2.0";
id: string | number;
result?: unknown;
error?: { code: number; message: string; data?: unknown };
}
function handleJsonRpc(request: JsonRpcRequest): JsonRpcResponse {
switch (request.method) {
case "tasks/send":
return handleTaskSend(request);
case "tasks/get":
return handleTaskGet(request);
case "tasks/cancel":
return handleTaskCancel(request);
case "tasks/sendSubscribe":
// Handled separately via SSE
throw new Error("Use SSE endpoint for sendSubscribe");
default:
return {
jsonrpc: "2.0",
id: request.id,
error: { code: -32601, message: `Method not found: ${request.method}` },
};
}
}
1.3. JSON-RPCハンドラーをPOSTエンドポイント(通常 /)にマウントする:
app.post("/", (req, res) => {
const response = handleJsonRpc(req.body);
res.json(response);
});
1.4. Agent Cardを /.well-known/agent.json で提供する:
app.get("/.well-known/agent.json", (req, res) => {
res.json(agentCard);
});
期待結果: JSON-RPC 2.0リクエストを受け付け、Agent Cardを提供するHTTPサーバー。
失敗時: JSON-RPCのパースが失敗する場合、リクエストボディに jsonrpc、method、id フィールドがあることを確認する。不正なJSONには -32700(Parse error)を、必須フィールドの欠落には -32600(Invalid Request)を返す。
ステップ2: タスクステートマシンの実装
2.1. すべてのA2Aライフサイクル状態を持つタスクモデルを定義する:
type TaskState =
| "submitted"
| "working"
| "input-required"
| "completed"
| "failed"
| "canceled";
interface Task {
id: string;
sessionId: string;
status: {
state: TaskState;
message?: Message;
timestamp: string;
};
history?: TaskStatus[];
artifacts?: Artifact[];
metadata?: Record<string, unknown>;
}
interface Message {
role: "user" | "agent";
parts: Part[];
}
type Part =
| { type: "text"; text: string }
| { type: "file"; file: { name: string; mimeType: string; bytes?: string; uri?: string } }
| { type: "data"; data: Record<string, unknown> };
2.2. 状態遷移ルールを実装する:
submitted -> working | failed | canceled
working -> completed | failed | canceled | input-required
input-required -> working | failed | canceled
completed -> (terminal)
failed -> (terminal)
canceled -> (terminal)
2.3. CRUD操作を持つタスクストアを作成する:
class TaskStore {
private tasks: Map<string, Task> = new Map();
create(sessionId: string, message: Message): Task { ... }
get(taskId: string): Task | undefined { ... }
updateStatus(taskId: string, state: TaskState, message?: Message): Task { ... }
addArtifact(taskId: string, artifact: Artifact): void { ... }
cancel(taskId: string): Task { ... }
}
2.4. Agent Cardで stateTransitionHistory が有効な場合、各状態変更をタイムスタンプ付きでタスクの history 配列に追加する。
期待結果: 有効な状態遷移を強制し履歴を維持するタスクストア。
失敗時: 無効な状態遷移が試みられた場合(例:completed から working)、コード -32002 と説明的なメッセージのJSON-RPCエラーを返す。無効な遷移を黙って無視しない。
ステップ3: tasks/sendとtasks/getメソッドの追加
3.1. tasks/send を実装する — タスク送信のためのプライマリメソッド:
function handleTaskSend(request: JsonRpcRequest): JsonRpcResponse {
const { id: taskId, sessionId, message } = request.params as TaskSendParams;
// Create or resume task
let task = taskStore.get(taskId);
if (!task) {
task = taskStore.create(sessionId, message);
} else if (task.status.state === "input-required") {
taskStore.updateStatus(task.id, "working");
}
// Route to skill handler based on message content
const skill = matchSkill(message);
if (!skill) {
taskStore.updateStatus(task.id, "failed", {
role: "agent",
parts: [{ type: "text", text: "No matching skill for this request." }],
});
return { jsonrpc: "2.0", id: request.id, result: taskStore.get(task.id) };
}
// Execute skill (async — task will transition to working, then completed/failed)
executeSkill(skill, task, message).catch((error) => {
taskStore.updateStatus(task.id, "failed", {
role: "agent",
parts: [{ type: "text", text: error.message }],
});
});
return { jsonrpc: "2.0", id: request.id, result: taskStore.get(task.id) };
}
3.2. tasks/get を実装する — タスクのステータスとアーティファクトの取得:
function handleTaskGet(request: JsonRpcRequest): JsonRpcResponse {
const { id: taskId, historyLength } = request.params as TaskGetParams;
const task = taskStore.get(taskId);
if (!task) {
return {
jsonrpc: "2.0",
id: request.id,
error: { code: -32001, message: `Task not found: ${taskId}` },
};
}
// Optionally trim history to requested length
const result = historyLength !== undefined
? { ...task, history: task.history?.slice(-historyLength) }
: task;
return { jsonrpc: "2.0", id: request.id, result };
}
3.3. tasks/cancel を実装する:
function handleTaskCancel(request: JsonRpcRequest): JsonRpcResponse {
const { id: taskId } = request.params as TaskCancelParams;
try {
const task = taskStore.cancel(taskId);
return { jsonrpc: "2.0", id: request.id, result: task };
} catch (error) {
return {
jsonrpc: "2.0",
id: request.id,
error: { code: -32002, message: (error as Error).message },
};
}
}
期待結果: タスクのライフサイクルを正しく管理する動作する tasks/send、tasks/get、tasks/cancel メソッド。
失敗時: スキルマッチングが失敗した場合、説明的なメッセージ付きの failed 状態でタスクを返す。タスクストアが満杯の場合、-32003(resource exhausted)を返す。
ステップ4: tasks/sendSubscribe用のSSEストリーミングの実装
4.1. タスク更新のストリーミング用SSEエンドポイントを作成する:
app.post("/subscribe", (req, res) => {
const request = req.body as JsonRpcRequest;
if (request.method !== "tasks/sendSubscribe") {
res.status(400).json({ error: "Only tasks/sendSubscribe supported" });
return;
}
// Set SSE headers
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
const { id: taskId, sessionId, message } = request.params as TaskSendParams;
let task = taskStore.get(taskId) ?? taskStore.create(sessionId, message);
// Send initial status
sendSSEEvent(res, "status", {
id: request.id,
result: { id: task.id, status: task.status },
});
// Subscribe to task updates
const unsubscribe = taskStore.onUpdate(task.id, (updatedTask) => {
if (updatedTask.status.state === "working") {
sendSSEEvent(res, "status", {
id: request.id,
result: { id: updatedTask.id, status: updatedTask.status },
});
}
if (updatedTask.artifacts?.length) {
sendSSEEvent(res, "artifact", {
id: request.id,
result: { id: updatedTask.id, artifact: updatedTask.artifacts.at(-1) },
});
}
// Close stream on terminal states
if (["completed", "failed", "canceled"].includes(updatedTask.status.state)) {
sendSSEEvent(res, "status", {
id: request.id,
result: { id: updatedTask.id, status: updatedTask.status, final: true },
});
unsubscribe();
res.end();
}
});
// Handle client disconnect
req.on("close", () => {
unsubscribe();
});
});
function sendSSEEvent(res: Response, event: string, data: unknown): void {
res.write(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`);
}
4.2. タスクストアにイベントエミッターまたはpub/subメカニズムを追加する:
class TaskStore {
private listeners: Map<string, Set<(task: Task) => void>> = new Map();
onUpdate(taskId: string, callback: (task: Task) => void): () => void {
if (!this.listeners.has(taskId)) {
this.listeners.set(taskId, new Set());
}
this.listeners.get(taskId)!.add(callback);
return () => this.listeners.get(taskId)?.delete(callback);
}
private notifyListeners(taskId: string): void {
const task = this.get(taskId);
if (task) {
this.listeners.get(taskId)?.forEach((cb) => cb(task));
}
}
}
4.3. すべてのタスク状態遷移とアーティファクト追加からイベントを発行する。
期待結果: タスクの進行に伴いリアルタイムのステータスとアーティファクトイベントを送信するSSEストリーミング。
失敗時: SSE接続が切断された場合、クライアントは再接続して tasks/get を使用して現在の状態を取得できるべき。タスクストアがアクティブなSSE接続に依存しないことを確認する。
ステップ5: プッシュ通知Webhookサポートの追加
5.1. Agent Cardで pushNotifications が有効な場合、tasks/pushNotification/set によるWebhook登録を実装する:
url(HTTPS必須)、任意のtoken、events配列(["status", "artifact"])を含むPushNotificationConfigを受け付ける- WebhookのURLがHTTPSを使用していることを検証する; そうでなければエラーコード
-32004で拒否する - 設定をタスクIDでキー付けされたタスクストアに格納する
5.2. タスクの状態変更時にWebhookコールバックを送信する:
- 各状態遷移またはアーティファクト追加時に、登録されたプッシュ設定を確認する
taskId、eventType、status、timestampを含むJSONペイロードをWebhook URLにPOSTする- トークンが提供されている場合、
Authorization: Bearer <token>ヘッダーを含める
5.3. 失敗したWebhookのリトライロジックを実装する(指数バックオフ、最大3回リトライ)。
5.4. タスクの現在のプッシュ設定を取得する tasks/pushNotification/get を追加する。
期待結果: リトライロジックを備えたWebhook登録と配信。
失敗時: プッシュ通知の失敗はタスクの実行に影響してはならない。エラーをログに記録して続行する。Webhook URLが永続的に到達不能な場合、最大リトライ後にサブスクリプションを削除する。
ステップ6: ディスカバリのためのAgent Cardとの統合
6.1. 起動時にAgent Cardを読み込んで提供する:
agent-card.jsonをパースし、機能が実装と一致することを検証する- カードが
streaming: trueを宣伝しているがSSEが有効でない場合、起動時にスローする - カードが
pushNotifications: trueを宣伝しているがWebhookが有効でない場合、起動時にスローする
6.2. クロスオリジンAgent Cardディスカバリ用のCORSヘッダーを追加する:
/.well-known/agent.jsonにAccess-Control-Allow-Origin: *を設定するGETとOPTIONSメソッドを許可する
6.3. Agent Cardのスキームに合致する認証ミドルウェアを追加する:
/.well-known/agent.jsonの認証をスキップする(Agent Cardは常にパブリック)- 他のすべてのエンドポイントでは、
AuthorizationヘッダーまたはAPIキーを検証する - 未認証のリクエストに対しJSON-RPCエラーコード
-32000でHTTP 401を返す
6.4. サーバーを起動してエンドツーエンドで検証する:
# Start server
npm run dev
# Fetch Agent Card
curl -s http://localhost:3000/.well-known/agent.json | python3 -m json.tool
# Send a task
curl -X POST http://localhost:3000/ \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","id":1,"method":"tasks/send","params":{"id":"task-1","sessionId":"session-1","message":{"role":"user","parts":[{"type":"text","text":"Analyze my dataset"}]}}}'
期待結果: Agent Cardを提供し、タスクを受け付け、完全なライフサイクルを管理する実行中のA2Aサーバー。
失敗時: Agent Cardの機能が実装と一致しない場合、6.1の起動時検証が不一致をキャッチする。実装を修正するかAgent Cardを更新して一致させる。
バリデーション
- サーバーが起動し
/.well-known/agent.jsonでAgent Cardを提供する -
tasks/sendがタスクを作成しライフサイクルを通じて遷移させる -
tasks/getがタスクのステータスとアーティファクトを取得する -
tasks/cancelがタスクをcanceled状態に移行する - SSEストリーミングがリアルタイムのステータスとアーティファクトイベントを送信する(有効な場合)
- プッシュ通知が状態変更時にWebhookを配信する(有効な場合)
- 無効な状態遷移が適切なJSON-RPCエラーを返す
- 認証が未認証リクエストを拒否する(設定されている場合)
- Agent Cardの機能がサーバーの実装を正確に反映する
- すべてのJSON-RPCレスポンスが
jsonrpc: "2.0"と正しいidを含む
よくある落とし穴
- JSON-RPCエラーコードの欠落: A2Aプロトコルは特定のエラーコードを定義する。
-32700(parse error)、-32600(invalid request)、-32601(method not found)、ドメインエラー用のカスタムコードを使用する - タスクIDの衝突: タスクIDにUUIDを使用する。クライアントがIDを提供する場合、タスク作成前に一意性を検証する
- SSE接続のリーク: クライアントが切断した時は常にSSEサブスクリプションをクリーンアップする。切断検出には
req.on("close")を使用する - スキル実行のブロッキング: 長時間実行されるスキルは非同期で実行する必要がある。即座に
submittedまたはworking状態でタスクを返し、イベント経由で更新する - Agent Cardのドリフト: サーバー実装が変更されたがAgent Cardが更新されない場合、クライアントは不正確な期待を持つ。起動時に検証する
- 終端状態の無視: タスクが
completed、failed、canceledに達したら、それ以上の状態遷移は許可されない。ステートマシンでこれを防御する
関連スキル
design-a2a-agent-card— このサーバーが実装するAgent Cardを設計するtest-a2a-interop— A2A適合性テストに対してサーバーを検証するbuild-custom-mcp-server— A2A実装に情報を提供するMCPサーバーパターンscaffold-mcp-server— A2Aサーバーセットアップに適用可能なスキャフォールディングパターンconfigure-ingress-networking— TLSとルーティングを備えた本番デプロイ
Dépôt GitHub
Compétences associées
executing-plans
DesignUtilisez la compétence executing-plans lorsque vous disposez d'un plan de mise en œuvre complet à exécuter par lots contrôlés avec des points de contrôle de revue. Elle charge et examine le plan de manière critique, puis exécute les tâches par petits lots (3 tâches par défaut) tout en rapportant la progression entre chaque lot pour une revue par l'architecte. Cela garantit une mise en œuvre systématique avec des points de contrôle de qualité intégrés.
requesting-code-review
DesignCette compétence délègue un sous-agent réviseur de code pour analyser les modifications apportées au code par rapport aux exigences avant de poursuivre. Elle doit être utilisée après avoir terminé des tâches, implémenté des fonctionnalités majeures, ou avant une fusion vers la branche principale. La revue aide à détecter précocement les problèmes en comparant l'implémentation actuelle avec le plan initial.
connect-mcp-server
DesignCette compétence fournit un guide complet permettant aux développeurs de connecter des serveurs MCP à Claude Code via les transports HTTP, stdio ou SSE. Elle couvre l'installation, la configuration, l'authentification et la sécurité pour intégrer des services externes tels que GitHub, Notion et des API personnalisées. Utilisez-la lors de la configuration d'intégrations MCP, de la configuration d'outils externes ou du travail avec le Protocole de Contexte de Modèle de Claude.
web-cli-teleport
DesignCette compétence aide les développeurs à choisir entre les interfaces Web et CLI de Claude Code en fonction de l'analyse des tâches, puis permet une téléportation transparente des sessions entre ces environnements. Elle optimise le flux de travail en gérant l'état et le contexte de la session lors du passage entre le web, la CLI ou le mobile. Utilisez-la pour des projets complexes nécessitant différents outils à diverses étapes.
