スキル一覧に戻る

implement-a2a-server

pjt222
更新日 2 days ago
2 閲覧
17
2
17
GitHubで表示
メタaiautomationdesign

について

このスキルは、マルチエージェントワークフロー向けのA2Aプロトコル準拠のJSON-RPC 2.0サーバーを開発者が実装することを支援します。完全なタスクライフサイクル管理、SSEストリーミング、プッシュ通知を提供し、エージェントがA2Aエコシステム内で相互運用できるようにします。Agent Cardsのバックエンド構築時や、既存のエージェントやサービスにA2Aサポートを追加する際にご利用ください。

クイックインストール

Claude Code

推奨
メイン
npx skills add pjt222/agent-almanac -a claude-code
プラグインコマンド代替
/plugin add https://github.com/pjt222/agent-almanac
Git クローン代替
git clone https://github.com/pjt222/agent-almanac.git ~/.claude/skills/implement-a2a-server

このコマンドをClaude Codeにコピー&ペーストしてスキルをインストールします

ドキュメント

Implement A2A Server

A2A server: JSON-RPC 2.0 + task lifecycle + SSE + Agent Card discovery.

Use When

  • Agent in multi-agent A2A workflow
  • Backend for Agent Card (design-a2a-agent-card)
  • Add A2A to existing agent/service
  • Reference server for testing
  • Deploy interoperable A2A agent

In

  • Required: Agent Card (JSON) defining skills + capabilities
  • Required: impl lang (TS/Node.js or Python)
  • Required: task exec logic per skill
  • Optional: push notification webhooks (bool)
  • Optional: persistent task store (memory, Redis, Postgres)
  • Optional: auth middleware matching Agent Card
  • Optional: max concurrent tasks

Do

Step 1: Setup JSON-RPC 2.0 handler

1.1. Init project w/ HTTP + JSON-RPC parsing:

TS:

mkdir -p $PROJECT_NAME && cd $PROJECT_NAME
npm init -y
npm install express uuid
npm install -D typescript @types/node @types/express tsx

Python:

mkdir -p $PROJECT_NAME && cd $PROJECT_NAME
python -m venv .venv && source .venv/bin/activate
pip install fastapi uvicorn uuid6

1.2. JSON-RPC 2.0 req handler:

interface JsonRpcRequest {
  jsonrpc: "2.0";
  id: string | number;
  method: string;
  params?: Record<string, unknown>;
}

interface JsonRpcResponse {
  jsonrpc: "2.0";
  id: string | number;
  result?: unknown;
  error?: { code: number; message: string; data?: unknown };
}

function handleJsonRpc(request: JsonRpcRequest): JsonRpcResponse {
  switch (request.method) {
    case "tasks/send":
      return handleTaskSend(request);
    case "tasks/get":
      return handleTaskGet(request);
    case "tasks/cancel":
      return handleTaskCancel(request);
    case "tasks/sendSubscribe":
      // Handled separately via SSE
      throw new Error("Use SSE endpoint for sendSubscribe");
    default:
      return {
        jsonrpc: "2.0",
        id: request.id,
        error: { code: -32601, message: `Method not found: ${request.method}` },
      };
  }
}

1.3. Mount on POST endpoint (usually /):

app.post("/", (req, res) => {
  const response = handleJsonRpc(req.body);
  res.json(response);
});

1.4. Serve Agent Card at /.well-known/agent.json:

app.get("/.well-known/agent.json", (req, res) => {
  res.json(agentCard);
});

→ HTTP server accepts JSON-RPC 2.0 + serves Agent Card.

If err: parsing fails → validate req body has jsonrpc, method, id. Return -32700 (Parse error) for malformed JSON, -32600 (Invalid Request) for missing fields.

Step 2: Task state machine

2.1. Task model w/ all A2A lifecycle states:

type TaskState =
  | "submitted"
  | "working"
  | "input-required"
  | "completed"
  | "failed"
  | "canceled";

interface Task {
  id: string;
  sessionId: string;
  status: {
    state: TaskState;
    message?: Message;
    timestamp: string;
  };
  history?: TaskStatus[];
  artifacts?: Artifact[];
  metadata?: Record<string, unknown>;
}

interface Message {
  role: "user" | "agent";
  parts: Part[];
}

type Part =
  | { type: "text"; text: string }
  | { type: "file"; file: { name: string; mimeType: string; bytes?: string; uri?: string } }
  | { type: "data"; data: Record<string, unknown> };

2.2. State transitions:

submitted  -> working | failed | canceled
working    -> completed | failed | canceled | input-required
input-required -> working | failed | canceled
completed  -> (terminal)
failed     -> (terminal)
canceled   -> (terminal)

2.3. Task store w/ CRUD:

class TaskStore {
  private tasks: Map<string, Task> = new Map();

  create(sessionId: string, message: Message): Task { ... }
  get(taskId: string): Task | undefined { ... }
  updateStatus(taskId: string, state: TaskState, message?: Message): Task { ... }
  addArtifact(taskId: string, artifact: Artifact): void { ... }
  cancel(taskId: string): Task { ... }
}

2.4. stateTransitionHistory enabled → append each status change to history w/ timestamps.

→ Task store enforces valid transitions + maintains history.

If err: invalid transition (completed → working) → JSON-RPC err code -32002 + descriptive msg. NEVER silently ignore.

Step 3: tasks/send + tasks/get

3.1. tasks/send (primary):

function handleTaskSend(request: JsonRpcRequest): JsonRpcResponse {
  const { id: taskId, sessionId, message } = request.params as TaskSendParams;

  // Create or resume task
  let task = taskStore.get(taskId);
  if (!task) {
    task = taskStore.create(sessionId, message);
  } else if (task.status.state === "input-required") {
    taskStore.updateStatus(task.id, "working");
  }

  // Route to skill handler based on message content
  const skill = matchSkill(message);
  if (!skill) {
    taskStore.updateStatus(task.id, "failed", {
      role: "agent",
      parts: [{ type: "text", text: "No matching skill for this request." }],
    });
    return { jsonrpc: "2.0", id: request.id, result: taskStore.get(task.id) };
  }

  // Execute skill (async — task will transition to working, then completed/failed)
  executeSkill(skill, task, message).catch((error) => {
    taskStore.updateStatus(task.id, "failed", {
      role: "agent",
      parts: [{ type: "text", text: error.message }],
    });
  });

  return { jsonrpc: "2.0", id: request.id, result: taskStore.get(task.id) };
}

3.2. tasks/get:

function handleTaskGet(request: JsonRpcRequest): JsonRpcResponse {
  const { id: taskId, historyLength } = request.params as TaskGetParams;
  const task = taskStore.get(taskId);

  if (!task) {
    return {
      jsonrpc: "2.0",
      id: request.id,
      error: { code: -32001, message: `Task not found: ${taskId}` },
    };
  }

  // Optionally trim history to requested length
  const result = historyLength !== undefined
    ? { ...task, history: task.history?.slice(-historyLength) }
    : task;

  return { jsonrpc: "2.0", id: request.id, result };
}

3.3. tasks/cancel:

function handleTaskCancel(request: JsonRpcRequest): JsonRpcResponse {
  const { id: taskId } = request.params as TaskCancelParams;
  try {
    const task = taskStore.cancel(taskId);
    return { jsonrpc: "2.0", id: request.id, result: task };
  } catch (error) {
    return {
      jsonrpc: "2.0",
      id: request.id,
      error: { code: -32002, message: (error as Error).message },
    };
  }
}

→ Working tasks/send, tasks/get, tasks/cancel manage lifecycle.

If err: skill match fails → task in failed state w/ descriptive msg. Store full → -32003 (resource exhausted).

Step 4: SSE for tasks/sendSubscribe

4.1. SSE endpoint:

app.post("/subscribe", (req, res) => {
  const request = req.body as JsonRpcRequest;
  if (request.method !== "tasks/sendSubscribe") {
    res.status(400).json({ error: "Only tasks/sendSubscribe supported" });
    return;
  }

  // Set SSE headers
  res.setHeader("Content-Type", "text/event-stream");
  res.setHeader("Cache-Control", "no-cache");
  res.setHeader("Connection", "keep-alive");

  const { id: taskId, sessionId, message } = request.params as TaskSendParams;
  let task = taskStore.get(taskId) ?? taskStore.create(sessionId, message);

  // Send initial status
  sendSSEEvent(res, "status", {
    id: request.id,
    result: { id: task.id, status: task.status },
  });

  // Subscribe to task updates
  const unsubscribe = taskStore.onUpdate(task.id, (updatedTask) => {
    if (updatedTask.status.state === "working") {
      sendSSEEvent(res, "status", {
        id: request.id,
        result: { id: updatedTask.id, status: updatedTask.status },
      });
    }

    if (updatedTask.artifacts?.length) {
      sendSSEEvent(res, "artifact", {
        id: request.id,
        result: { id: updatedTask.id, artifact: updatedTask.artifacts.at(-1) },
      });
    }

    // Close stream on terminal states
    if (["completed", "failed", "canceled"].includes(updatedTask.status.state)) {
      sendSSEEvent(res, "status", {
        id: request.id,
        result: { id: updatedTask.id, status: updatedTask.status, final: true },
      });
      unsubscribe();
      res.end();
    }
  });

  // Handle client disconnect
  req.on("close", () => {
    unsubscribe();
  });
});

function sendSSEEvent(res: Response, event: string, data: unknown): void {
  res.write(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`);
}

4.2. Event emitter / pub-sub in task store:

class TaskStore {
  private listeners: Map<string, Set<(task: Task) => void>> = new Map();

  onUpdate(taskId: string, callback: (task: Task) => void): () => void {
    if (!this.listeners.has(taskId)) {
      this.listeners.set(taskId, new Set());
    }
    this.listeners.get(taskId)!.add(callback);
    return () => this.listeners.get(taskId)?.delete(callback);
  }

  private notifyListeners(taskId: string): void {
    const task = this.get(taskId);
    if (task) {
      this.listeners.get(taskId)?.forEach((cb) => cb(task));
    }
  }
}

4.3. Emit events from all state transitions + artifact additions.

→ SSE streams real-time status + artifact events.

If err: SSE drops → client can reconnect + use tasks/get for current state. Store must not depend on active SSE.

Step 5: Push webhook support

5.1. If pushNotifications in Agent Card → impl webhook registration via tasks/pushNotification/set:

  • Accept PushNotificationConfig w/ url (HTTPS req'd), opt token, events array (["status", "artifact"])
  • Validate HTTPS → reject w/ -32004 otherwise
  • Store config in task store, keyed by task ID

5.2. Webhook callbacks on state changes:

  • Each transition / artifact → check registered config
  • POST JSON payload w/ taskId, eventType, status, timestamp
  • Authorization: Bearer <token> if provided

5.3. Retry logic (exponential backoff, max 3).

5.4. tasks/pushNotification/get retrieves config.

→ Webhook registration + delivery w/ retry.

If err: push failures MUST NOT affect task exec. Log + continue. Persistent unreachable → remove after max retries.

Step 6: Agent Card for discovery

6.1. Load + serve at startup:

  • Parse agent-card.json + validate capabilities match impl
  • Throw startup if card advertises streaming: true but SSE disabled
  • Throw if pushNotifications: true but webhooks disabled

6.2. CORS for cross-origin discovery:

  • Access-Control-Allow-Origin: * on /.well-known/agent.json
  • Allow GET + OPTIONS

6.3. Auth middleware per card scheme:

  • Skip auth on /.well-known/agent.json (always public)
  • Other endpoints → validate Authorization / API key
  • HTTP 401 + JSON-RPC -32000 for unauthorized

6.4. Start + verify E2E:

# Start server
npm run dev

# Fetch Agent Card
curl -s http://localhost:3000/.well-known/agent.json | python3 -m json.tool

# Send a task
curl -X POST http://localhost:3000/ \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","id":1,"method":"tasks/send","params":{"id":"task-1","sessionId":"session-1","message":{"role":"user","parts":[{"type":"text","text":"Analyze my dataset"}]}}}'

→ Running server serves Agent Card + accepts tasks + manages lifecycle.

If err: capabilities mismatch impl → startup validation (6.1) catches. Fix impl or update card.

Check

  • Server serves Agent Card at /.well-known/agent.json
  • tasks/send creates + transitions lifecycle
  • tasks/get retrieves status + artifacts
  • tasks/cancel → canceled
  • SSE sends real-time status + artifact (if enabled)
  • Push webhooks deliver on state changes (if enabled)
  • Invalid transitions → JSON-RPC errors
  • Auth rejects unauthorized
  • Card capabilities match impl
  • All JSON-RPC responses include jsonrpc: "2.0" + correct id

Traps

  • Missing JSON-RPC codes: A2A defines specific. Use -32700 (parse), -32600 (invalid req), -32601 (method not found), custom for domain errors.
  • Task ID collisions: UUIDs. Client-provided → validate uniqueness.
  • SSE leaks: clean up subscriptions on disconnect. req.on("close") detects.
  • Blocking skill exec: long-running → async. Return submitted/working immediately, update via events.
  • Agent Card drift: impl changes, card not updated → wrong client expectations. Validate at startup.
  • Ignore terminal states: completed/failed/canceled → no further transitions. Guard in state machine.

  • design-a2a-agent-card — design the card this server implements
  • test-a2a-interop — validate against A2A conformance tests
  • build-custom-mcp-server — MCP patterns inform A2A impl
  • scaffold-mcp-server — scaffolding applicable
  • configure-ingress-networking — prod deploy w/ TLS + routing

GitHub リポジトリ

pjt222/agent-almanac
パス: i18n/caveman-ultra/skills/implement-a2a-server
0
agentsagentskillsai-assisted-developmentclaude-codeskillsteams

関連スキル

content-collections

メタ

このスキルは、Content Collections(Markdown/MDXファイルを型安全なデータコレクションに変換するTypeScriptファーストのツール)の本番環境でテストされた設定を提供します。Zodバリデーションによる型安全性を実現し、ブログ、ドキュメントサイト、コンテンツ重視のVite + Reactアプリケーション構築時にご利用ください。Viteプラグインの設定、MDXコンパイルから、デプロイ最適化、スキーマバリデーションまで、すべてを網羅しています。

スキルを見る

polymarket

メタ

このスキルは、開発者がPolymarket予測市場プラットフォームを活用したアプリケーション構築を可能にします。API統合による取引や市場データの取得に加え、WebSocketを介したリアルタイムデータストリーミングにより、ライブ取引や市場活動を監視できます。取引戦略の実装や、ライブ市場更新を処理するツールの作成にご利用ください。

スキルを見る

creating-opencode-plugins

メタ

このスキルは、開発者がコマンド、ファイル、LSP操作など25種類以上のイベントタイプにフックするOpenCodeプラグインを作成することを支援します。JavaScript/TypeScriptモジュール向けに、プラグイン構造、イベントAPI仕様、および実装パターンを提供します。カスタムイベント駆動ロジックでOpenCode AIアシスタントのライフサイクルをインターセプト、監視、または拡張する必要がある場合にご利用ください。

スキルを見る

sglang

メタ

SGLangは、高性能なLLMサービングフレームワークであり、RadixAttentionプレフィックスキャッシュを活用したJSON、正規表現、エージェントワークフロー向けの高速で構造化された生成を特長とします。特にプレフィックスが繰り返されるタスクにおいて、大幅に高速な推論を実現し、複雑な構造化出力やマルチターン対話に最適です。制約付きデコードが必要な場合や、広範なプレフィックス共有を伴うアプリケーションを構築する場合は、vLLMなどの代替案ではなくSGLangを選択してください。

スキルを見る