implement-a2a-server
정보
이 스킬은 개발자가 다중 에이전트 워크플로우를 위한 A2A 프로토콜을 준수하는 JSON-RPC 2.0 서버를 구현할 수 있도록 지원합니다. 완전한 작업 생명주기 관리, SSE 스트리밍 및 푸시 알림 기능을 제공하여 에이전트가 A2A 생태계 내에서 상호 운용될 수 있게 합니다. 에이전트 카드용 백엔드를 구축하거나 기존 에이전트 및 서비스에 A2A 지원을 추가할 때 사용하세요.
빠른 설치
Claude Code
추천npx skills add pjt222/agent-almanac -a claude-code/plugin add https://github.com/pjt222/agent-almanacgit clone https://github.com/pjt222/agent-almanac.git ~/.claude/skills/implement-a2a-serverClaude Code에서 이 명령을 복사하여 붙여넣어 스킬을 설치하세요
문서
Implement A2A Server
A2A server: JSON-RPC 2.0 + task lifecycle + SSE + Agent Card discovery.
Use When
- Agent in multi-agent A2A workflow
- Backend for Agent Card (
design-a2a-agent-card) - Add A2A to existing agent/service
- Reference server for testing
- Deploy interoperable A2A agent
In
- Required: Agent Card (JSON) defining skills + capabilities
- Required: impl lang (TS/Node.js or Python)
- Required: task exec logic per skill
- Optional: push notification webhooks (bool)
- Optional: persistent task store (memory, Redis, Postgres)
- Optional: auth middleware matching Agent Card
- Optional: max concurrent tasks
Do
Step 1: Setup JSON-RPC 2.0 handler
1.1. Init project w/ HTTP + JSON-RPC parsing:
TS:
mkdir -p $PROJECT_NAME && cd $PROJECT_NAME
npm init -y
npm install express uuid
npm install -D typescript @types/node @types/express tsx
Python:
mkdir -p $PROJECT_NAME && cd $PROJECT_NAME
python -m venv .venv && source .venv/bin/activate
pip install fastapi uvicorn uuid6
1.2. JSON-RPC 2.0 req handler:
interface JsonRpcRequest {
jsonrpc: "2.0";
id: string | number;
method: string;
params?: Record<string, unknown>;
}
interface JsonRpcResponse {
jsonrpc: "2.0";
id: string | number;
result?: unknown;
error?: { code: number; message: string; data?: unknown };
}
function handleJsonRpc(request: JsonRpcRequest): JsonRpcResponse {
switch (request.method) {
case "tasks/send":
return handleTaskSend(request);
case "tasks/get":
return handleTaskGet(request);
case "tasks/cancel":
return handleTaskCancel(request);
case "tasks/sendSubscribe":
// Handled separately via SSE
throw new Error("Use SSE endpoint for sendSubscribe");
default:
return {
jsonrpc: "2.0",
id: request.id,
error: { code: -32601, message: `Method not found: ${request.method}` },
};
}
}
1.3. Mount on POST endpoint (usually /):
app.post("/", (req, res) => {
const response = handleJsonRpc(req.body);
res.json(response);
});
1.4. Serve Agent Card at /.well-known/agent.json:
app.get("/.well-known/agent.json", (req, res) => {
res.json(agentCard);
});
→ HTTP server accepts JSON-RPC 2.0 + serves Agent Card.
If err: parsing fails → validate req body has jsonrpc, method, id. Return -32700 (Parse error) for malformed JSON, -32600 (Invalid Request) for missing fields.
Step 2: Task state machine
2.1. Task model w/ all A2A lifecycle states:
type TaskState =
| "submitted"
| "working"
| "input-required"
| "completed"
| "failed"
| "canceled";
interface Task {
id: string;
sessionId: string;
status: {
state: TaskState;
message?: Message;
timestamp: string;
};
history?: TaskStatus[];
artifacts?: Artifact[];
metadata?: Record<string, unknown>;
}
interface Message {
role: "user" | "agent";
parts: Part[];
}
type Part =
| { type: "text"; text: string }
| { type: "file"; file: { name: string; mimeType: string; bytes?: string; uri?: string } }
| { type: "data"; data: Record<string, unknown> };
2.2. State transitions:
submitted -> working | failed | canceled
working -> completed | failed | canceled | input-required
input-required -> working | failed | canceled
completed -> (terminal)
failed -> (terminal)
canceled -> (terminal)
2.3. Task store w/ CRUD:
class TaskStore {
private tasks: Map<string, Task> = new Map();
create(sessionId: string, message: Message): Task { ... }
get(taskId: string): Task | undefined { ... }
updateStatus(taskId: string, state: TaskState, message?: Message): Task { ... }
addArtifact(taskId: string, artifact: Artifact): void { ... }
cancel(taskId: string): Task { ... }
}
2.4. stateTransitionHistory enabled → append each status change to history w/ timestamps.
→ Task store enforces valid transitions + maintains history.
If err: invalid transition (completed → working) → JSON-RPC err code -32002 + descriptive msg. NEVER silently ignore.
Step 3: tasks/send + tasks/get
3.1. tasks/send (primary):
function handleTaskSend(request: JsonRpcRequest): JsonRpcResponse {
const { id: taskId, sessionId, message } = request.params as TaskSendParams;
// Create or resume task
let task = taskStore.get(taskId);
if (!task) {
task = taskStore.create(sessionId, message);
} else if (task.status.state === "input-required") {
taskStore.updateStatus(task.id, "working");
}
// Route to skill handler based on message content
const skill = matchSkill(message);
if (!skill) {
taskStore.updateStatus(task.id, "failed", {
role: "agent",
parts: [{ type: "text", text: "No matching skill for this request." }],
});
return { jsonrpc: "2.0", id: request.id, result: taskStore.get(task.id) };
}
// Execute skill (async — task will transition to working, then completed/failed)
executeSkill(skill, task, message).catch((error) => {
taskStore.updateStatus(task.id, "failed", {
role: "agent",
parts: [{ type: "text", text: error.message }],
});
});
return { jsonrpc: "2.0", id: request.id, result: taskStore.get(task.id) };
}
3.2. tasks/get:
function handleTaskGet(request: JsonRpcRequest): JsonRpcResponse {
const { id: taskId, historyLength } = request.params as TaskGetParams;
const task = taskStore.get(taskId);
if (!task) {
return {
jsonrpc: "2.0",
id: request.id,
error: { code: -32001, message: `Task not found: ${taskId}` },
};
}
// Optionally trim history to requested length
const result = historyLength !== undefined
? { ...task, history: task.history?.slice(-historyLength) }
: task;
return { jsonrpc: "2.0", id: request.id, result };
}
3.3. tasks/cancel:
function handleTaskCancel(request: JsonRpcRequest): JsonRpcResponse {
const { id: taskId } = request.params as TaskCancelParams;
try {
const task = taskStore.cancel(taskId);
return { jsonrpc: "2.0", id: request.id, result: task };
} catch (error) {
return {
jsonrpc: "2.0",
id: request.id,
error: { code: -32002, message: (error as Error).message },
};
}
}
→ Working tasks/send, tasks/get, tasks/cancel manage lifecycle.
If err: skill match fails → task in failed state w/ descriptive msg. Store full → -32003 (resource exhausted).
Step 4: SSE for tasks/sendSubscribe
4.1. SSE endpoint:
app.post("/subscribe", (req, res) => {
const request = req.body as JsonRpcRequest;
if (request.method !== "tasks/sendSubscribe") {
res.status(400).json({ error: "Only tasks/sendSubscribe supported" });
return;
}
// Set SSE headers
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
const { id: taskId, sessionId, message } = request.params as TaskSendParams;
let task = taskStore.get(taskId) ?? taskStore.create(sessionId, message);
// Send initial status
sendSSEEvent(res, "status", {
id: request.id,
result: { id: task.id, status: task.status },
});
// Subscribe to task updates
const unsubscribe = taskStore.onUpdate(task.id, (updatedTask) => {
if (updatedTask.status.state === "working") {
sendSSEEvent(res, "status", {
id: request.id,
result: { id: updatedTask.id, status: updatedTask.status },
});
}
if (updatedTask.artifacts?.length) {
sendSSEEvent(res, "artifact", {
id: request.id,
result: { id: updatedTask.id, artifact: updatedTask.artifacts.at(-1) },
});
}
// Close stream on terminal states
if (["completed", "failed", "canceled"].includes(updatedTask.status.state)) {
sendSSEEvent(res, "status", {
id: request.id,
result: { id: updatedTask.id, status: updatedTask.status, final: true },
});
unsubscribe();
res.end();
}
});
// Handle client disconnect
req.on("close", () => {
unsubscribe();
});
});
function sendSSEEvent(res: Response, event: string, data: unknown): void {
res.write(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`);
}
4.2. Event emitter / pub-sub in task store:
class TaskStore {
private listeners: Map<string, Set<(task: Task) => void>> = new Map();
onUpdate(taskId: string, callback: (task: Task) => void): () => void {
if (!this.listeners.has(taskId)) {
this.listeners.set(taskId, new Set());
}
this.listeners.get(taskId)!.add(callback);
return () => this.listeners.get(taskId)?.delete(callback);
}
private notifyListeners(taskId: string): void {
const task = this.get(taskId);
if (task) {
this.listeners.get(taskId)?.forEach((cb) => cb(task));
}
}
}
4.3. Emit events from all state transitions + artifact additions.
→ SSE streams real-time status + artifact events.
If err: SSE drops → client can reconnect + use tasks/get for current state. Store must not depend on active SSE.
Step 5: Push webhook support
5.1. If pushNotifications in Agent Card → impl webhook registration via tasks/pushNotification/set:
- Accept
PushNotificationConfigw/url(HTTPS req'd), opttoken,eventsarray (["status", "artifact"]) - Validate HTTPS → reject w/
-32004otherwise - Store config in task store, keyed by task ID
5.2. Webhook callbacks on state changes:
- Each transition / artifact → check registered config
- POST JSON payload w/
taskId,eventType,status,timestamp Authorization: Bearer <token>if provided
5.3. Retry logic (exponential backoff, max 3).
5.4. tasks/pushNotification/get retrieves config.
→ Webhook registration + delivery w/ retry.
If err: push failures MUST NOT affect task exec. Log + continue. Persistent unreachable → remove after max retries.
Step 6: Agent Card for discovery
6.1. Load + serve at startup:
- Parse
agent-card.json+ validate capabilities match impl - Throw startup if card advertises
streaming: truebut SSE disabled - Throw if
pushNotifications: truebut webhooks disabled
6.2. CORS for cross-origin discovery:
Access-Control-Allow-Origin: *on/.well-known/agent.json- Allow
GET+OPTIONS
6.3. Auth middleware per card scheme:
- Skip auth on
/.well-known/agent.json(always public) - Other endpoints → validate
Authorization/ API key - HTTP 401 + JSON-RPC
-32000for unauthorized
6.4. Start + verify E2E:
# Start server
npm run dev
# Fetch Agent Card
curl -s http://localhost:3000/.well-known/agent.json | python3 -m json.tool
# Send a task
curl -X POST http://localhost:3000/ \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","id":1,"method":"tasks/send","params":{"id":"task-1","sessionId":"session-1","message":{"role":"user","parts":[{"type":"text","text":"Analyze my dataset"}]}}}'
→ Running server serves Agent Card + accepts tasks + manages lifecycle.
If err: capabilities mismatch impl → startup validation (6.1) catches. Fix impl or update card.
Check
- Server serves Agent Card at
/.well-known/agent.json -
tasks/sendcreates + transitions lifecycle -
tasks/getretrieves status + artifacts -
tasks/cancel→ canceled - SSE sends real-time status + artifact (if enabled)
- Push webhooks deliver on state changes (if enabled)
- Invalid transitions → JSON-RPC errors
- Auth rejects unauthorized
- Card capabilities match impl
- All JSON-RPC responses include
jsonrpc: "2.0"+ correctid
Traps
- Missing JSON-RPC codes: A2A defines specific. Use
-32700(parse),-32600(invalid req),-32601(method not found), custom for domain errors. - Task ID collisions: UUIDs. Client-provided → validate uniqueness.
- SSE leaks: clean up subscriptions on disconnect.
req.on("close")detects. - Blocking skill exec: long-running → async. Return
submitted/workingimmediately, update via events. - Agent Card drift: impl changes, card not updated → wrong client expectations. Validate at startup.
- Ignore terminal states: completed/failed/canceled → no further transitions. Guard in state machine.
→
design-a2a-agent-card— design the card this server implementstest-a2a-interop— validate against A2A conformance testsbuild-custom-mcp-server— MCP patterns inform A2A implscaffold-mcp-server— scaffolding applicableconfigure-ingress-networking— prod deploy w/ TLS + routing
GitHub 저장소
연관 스킬
content-collections
메타이 스킬은 콘텐츠 콜렉션(Content Collections)을 위한 프로덕션 검증된 설정을 제공합니다. 콘텐츠 콜렉션은 Markdown/MDX 파일을 Zod 검증이 포함된 타입 안전한 데이터 콜렉션으로 변환해주는 TypeScript 최우선 도구입니다. 블로그, 문서 사이트 또는 콘텐츠 중심의 Vite + React 애플리케이션을 구축할 때 타입 안전성과 자동 콘텐츠 검증을 보장하기 위해 사용하세요. Vite 플러그인 구성과 MDX 컴파일부터 배포 최적화 및 스키마 검증에 이르기까지 모든 것을 다룹니다.
polymarket
메타이 스킬은 개발자들이 Polymarket 예측 시장 플랫폼을 활용한 애플리케이션을 구축할 수 있도록 지원하며, 거래 및 시장 데이터를 위한 API 통합 기능을 포함합니다. 또한 WebSocket을 통한 실시간 데이터 스트리밍을 제공하여 실시간 거래와 시장 활동을 모니터링할 수 있습니다. 이를 통해 거래 전략을 구현하거나 실시간 시장 업데이트를 처리하는 도구를 생성하는 데 활용할 수 있습니다.
creating-opencode-plugins
메타이 스킬은 개발자들이 명령어, 파일, LSP 작업 등 25개 이상의 이벤트 유형에 연결되는 OpenCode 플러그인을 만들 수 있도록 돕습니다. JavaScript/TypeScript 모듈을 위한 플러그인 구조, 이벤트 API 명세, 구현 패턴을 제공합니다. OpenCode AI 어시스턴트의 라이프사이클을 사용자 정의 이벤트 기반 로직으로 가로채거나, 모니터링하거나, 확장해야 할 때 사용하세요.
sglang
메타SGLang은 RadixAttention 프리픽스 캐싱을 활용하여 JSON, 정규식, 에이전트 워크플로우를 위한 고속 구조화 생성에 특화된 고성능 LLM 서빙 프레임워크입니다. 특히 반복되는 프리픽스가 있는 작업에서 상당히 빠른 추론 속도를 제공하여 복잡한 구조화 출력 및 다중 턴 대화에 이상적입니다. 제약 디코딩이 필요하거나 광범위한 프리픽스 공유가 있는 애플리케이션을 구축할 때는 vLLM과 같은 대안보다 SGLang을 선택하십시오.
