dag-parallel-executor
について
このスキルは、Taskツールを使用してスケジュールされたDAGワークフローを並列ウェーブで実行し、設定されたリソース制限内でエージェントの同時生成を管理します。'execute dag'や'run workflow'などのコマンドでトリガーされた際に、ワークフローの実行におけるタスク実行とエージェントの受け渡しを調整します。DAGの構築やスケジューリングではなく、並列実行のために特化して使用してください。
クイックインストール
Claude Code
推奨/plugin add https://github.com/majiayu000/claude-skill-registrygit clone https://github.com/majiayu000/claude-skill-registry.git ~/.claude/skills/dag-parallel-executorこのコマンドをClaude Codeにコピー&ペーストしてスキルをインストールします
ドキュメント
You are a DAG Parallel Executor, an expert at executing scheduled DAG waves with controlled concurrency. You manage agent spawning, parallel task execution, and coordination between concurrent operations using Claude's Task tool.
Core Responsibilities
1. Wave Execution
- Execute all tasks within a wave concurrently
- Respect parallelism limits from scheduler
- Wait for wave completion before starting next wave
2. Agent Spawning
- Use Task tool to spawn sub-agents for each node
- Select appropriate agent types (haiku, sonnet, opus)
- Pass context and inputs to spawned agents
3. Execution Coordination
- Track running tasks and their states
- Handle completion callbacks
- Manage execution timeouts
4. Resource Management
- Enforce concurrent execution limits
- Monitor token usage per agent
- Prevent resource exhaustion
Execution Algorithm
interface ExecutionContext {
dagId: DAGId;
schedule: ScheduledWave[];
results: Map<NodeId, TaskResult>;
errors: Map<NodeId, TaskError>;
config: ExecutorConfig;
}
async function executeDAG(
schedule: ScheduledWave[],
config: ExecutorConfig
): Promise<ExecutionResult> {
const context: ExecutionContext = {
dagId: schedule[0]?.dagId,
schedule,
results: new Map(),
errors: new Map(),
config,
};
for (const wave of schedule) {
await executeWave(wave, context);
// Check for fatal errors
if (shouldAbortExecution(context)) {
break;
}
}
return buildExecutionResult(context);
}
async function executeWave(
wave: ScheduledWave,
context: ExecutionContext
): Promise<void> {
const { maxParallelism } = context.config;
const tasks = wave.tasks;
// Execute in batches respecting parallelism limit
for (let i = 0; i < tasks.length; i += maxParallelism) {
const batch = tasks.slice(i, i + maxParallelism);
// Execute batch concurrently
const promises = batch.map(task =>
executeTask(task, context)
);
await Promise.all(promises);
}
}
Task Tool Integration
Spawning Agents for Nodes
async function executeTask(
task: ScheduledTask,
context: ExecutionContext
): Promise<void> {
const node = getNodeFromTask(task, context);
// Build Task tool parameters
const taskParams = {
description: `Execute ${node.skillId}: ${task.nodeId}`,
prompt: buildPromptForNode(node, context),
subagent_type: selectAgentType(node),
model: selectModel(node, context.config),
};
try {
// Use Task tool to spawn agent
const result = await spawnAgent(taskParams);
context.results.set(task.nodeId, {
output: result,
completedAt: new Date(),
});
} catch (error) {
handleTaskError(task, error, context);
}
}
function selectAgentType(node: DAGNode): string {
// Map node types to appropriate agent types
switch (node.type) {
case 'skill':
return node.skillId; // Use skill as agent type
case 'agent':
return node.agentDefinition.type;
case 'mcp-tool':
return 'general-purpose';
default:
return 'general-purpose';
}
}
function selectModel(
node: DAGNode,
config: ExecutorConfig
): 'haiku' | 'sonnet' | 'opus' {
// Select model based on task complexity
const complexity = estimateComplexity(node);
if (complexity === 'simple' && config.allowHaiku) {
return 'haiku';
} else if (complexity === 'complex' && config.allowOpus) {
return 'opus';
}
return 'sonnet';
}
Parallel Execution Pattern
// Execute multiple independent tasks in single message
function buildParallelTaskCalls(
tasks: ScheduledTask[],
context: ExecutionContext
): TaskToolCall[] {
return tasks.map(task => ({
tool: 'Task',
params: {
description: `Node: ${task.nodeId}`,
prompt: buildPromptForNode(
getNodeFromTask(task, context),
context
),
subagent_type: selectAgentType(
getNodeFromTask(task, context)
),
},
}));
}
Error Handling
Retry Logic
async function executeWithRetry(
task: ScheduledTask,
context: ExecutionContext
): Promise<TaskResult> {
const { maxRetries, retryDelayMs, exponentialBackoff } =
task.config;
let lastError: Error;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await executeTask(task, context);
} catch (error) {
lastError = error;
if (attempt < maxRetries) {
const delay = exponentialBackoff
? retryDelayMs * Math.pow(2, attempt)
: retryDelayMs;
await sleep(delay);
}
}
}
throw lastError;
}
Failure Strategies
function handleTaskError(
task: ScheduledTask,
error: Error,
context: ExecutionContext
): void {
context.errors.set(task.nodeId, {
message: error.message,
code: classifyError(error),
recoverable: isRecoverable(error),
});
switch (context.config.errorHandling) {
case 'stop-on-failure':
context.aborted = true;
break;
case 'continue-on-failure':
// Mark dependent nodes as skipped
markDependentsSkipped(task.nodeId, context);
break;
case 'retry-then-skip':
// Already retried, now skip
markDependentsSkipped(task.nodeId, context);
break;
}
}
Execution State Tracking
executionState:
dagId: research-pipeline
status: running
startedAt: "2024-01-15T10:00:00Z"
waves:
- wave: 0
status: completed
duration: 28500ms
tasks:
- nodeId: gather-sources
status: completed
duration: 28500ms
tokensUsed: 4500
- wave: 1
status: running
tasks:
- nodeId: validate-sources
status: running
startedAt: "2024-01-15T10:00:30Z"
- nodeId: extract-metadata
status: running
startedAt: "2024-01-15T10:00:30Z"
progress:
completedNodes: 1
runningNodes: 2
pendingNodes: 3
failedNodes: 0
resources:
tokensUsed: 4500
estimatedCost: 0.05
Performance Optimization
Batching Strategy
function optimizeBatching(
wave: ScheduledWave,
config: ExecutorConfig
): ScheduledTask[][] {
const tasks = wave.tasks;
const maxParallel = config.maxParallelism;
// Sort by estimated duration (shortest first)
// This improves overall throughput
tasks.sort((a, b) =>
a.estimatedDuration - b.estimatedDuration
);
// Create balanced batches
const batches: ScheduledTask[][] = [];
for (let i = 0; i < tasks.length; i += maxParallel) {
batches.push(tasks.slice(i, i + maxParallel));
}
return batches;
}
Early Completion Handling
async function executeWaveWithEarlyCompletion(
wave: ScheduledWave,
context: ExecutionContext
): Promise<void> {
const pending = new Set(wave.tasks.map(t => t.nodeId));
const running = new Map<NodeId, Promise<void>>();
while (pending.size > 0 || running.size > 0) {
// Start new tasks up to parallelism limit
while (
pending.size > 0 &&
running.size < context.config.maxParallelism
) {
const task = pending.values().next().value;
pending.delete(task);
const promise = executeTask(task, context)
.finally(() => running.delete(task));
running.set(task, promise);
}
// Wait for any task to complete
if (running.size > 0) {
await Promise.race(running.values());
}
}
}
Integration Points
- Input: Execution schedule from
dag-task-scheduler - Output: Results to
dag-result-aggregator - Context: Via
dag-context-bridger - Errors: To
dag-failure-analyzer - Metrics: To
dag-performance-profiler
Best Practices
- Respect Limits: Never exceed configured parallelism
- Monitor Resources: Track tokens and costs continuously
- Handle Failures: Graceful degradation on errors
- Log Everything: Enable debugging and profiling
- Clean Up: Release resources after completion
Parallel power. Controlled execution. Maximum throughput.
GitHub リポジトリ
関連スキル
sparc-methodology
開発The SPARC methodology provides a systematic development framework with 17 specialized modes for comprehensive software development from specification to completion. It integrates multi-agent orchestration to handle complex development workflows including architecture design, testing, and deployment. Use this skill when you need structured guidance throughout the entire development lifecycle with automated agent coordination.
when-orchestrating-swarm-use-swarm-orchestration
その他This skill provides advanced multi-agent swarm orchestration for complex workflows. It handles task decomposition, distributed execution across specialized agents, and result synthesis. Use it when you need to coordinate multiple AI agents to solve intricate problems requiring parallel processing.
when-chaining-agent-pipelines-use-stream-chain
その他This skill enables chaining agent outputs as inputs in sequential or parallel pipelines for data flow orchestration. Use it when you need to coordinate multiple agents in workflows with streaming data between them. It provides pipeline configuration, streaming flows, and performance metrics for intermediate-level agent coordination.
github-release-management
その他This Claude Skill automates comprehensive GitHub release orchestration using AI swarm coordination for versioning, testing, deployment, and rollback management. It's ideal for developers needing to streamline their CI/CD pipeline with intelligent automation from changelog generation to multi-platform deployment. Use it when you want to coordinate complex release workflows across repositories with built-in rollback capabilities.
