when-chaining-agent-pipelines-use-stream-chain
关于
This skill enables chaining agent outputs as inputs in sequential or parallel pipelines for data flow orchestration. Use it when you need to coordinate multiple agents in workflows with streaming data between them. It provides pipeline configuration, streaming flows, and performance metrics for intermediate-level agent coordination.
快速安装
Claude Code
推荐/plugin add https://github.com/DNYoussef/ai-chrome-extensiongit clone https://github.com/DNYoussef/ai-chrome-extension.git ~/.claude/skills/when-chaining-agent-pipelines-use-stream-chain在 Claude Code 中复制并粘贴此命令以安装该技能
技能文档
Agent Pipeline Chaining SOP
Overview
This skill implements agent pipeline chaining where outputs from one agent become inputs to the next, supporting both sequential and parallel execution patterns with streaming data flows.
Agents & Responsibilities
task-orchestrator
Role: Pipeline coordination and orchestration Responsibilities:
- Design pipeline architecture
- Connect agent stages
- Monitor data flow
- Handle pipeline errors
memory-coordinator
Role: Data flow and state management Responsibilities:
- Store intermediate results
- Coordinate data passing
- Manage pipeline state
- Ensure data consistency
Phase 1: Design Pipeline
Objective
Design pipeline architecture with stages, data flows, and execution strategy.
Scripts
# Design pipeline architecture
npx claude-flow@alpha pipeline design \
--stages "research,analyze,code,test,review" \
--flow sequential \
--output pipeline-design.json
# Define data flow
npx claude-flow@alpha pipeline dataflow \
--design pipeline-design.json \
--output dataflow-spec.json
# Visualize pipeline
npx claude-flow@alpha pipeline visualize \
--design pipeline-design.json \
--output pipeline-diagram.png
# Store design in memory
npx claude-flow@alpha memory store \
--key "pipeline/design" \
--file pipeline-design.json
Pipeline Patterns
Sequential Pipeline:
Agent1 → Agent2 → Agent3 → Agent4
Parallel Pipeline:
┌─ Agent2 ─┐
Agent1 ├─ Agent3 ─┤ Agent5
└─ Agent4 ─┘
Hybrid Pipeline:
Agent1 → ┬─ Agent2 ─┐
└─ Agent3 ─┴─ Agent4 → Agent5
Phase 2: Connect Agents
Objective
Connect agents with proper data flow channels and state management.
Scripts
# Initialize pipeline
npx claude-flow@alpha pipeline init \
--design pipeline-design.json
# Spawn pipeline agents
npx claude-flow@alpha agent spawn --type researcher --pipeline-stage 1
npx claude-flow@alpha agent spawn --type analyst --pipeline-stage 2
npx claude-flow@alpha agent spawn --type coder --pipeline-stage 3
npx claude-flow@alpha agent spawn --type tester --pipeline-stage 4
# Connect pipeline stages
npx claude-flow@alpha pipeline connect \
--from-stage 1 --to-stage 2 \
--data-channel "memory"
npx claude-flow@alpha pipeline connect \
--from-stage 2 --to-stage 3 \
--data-channel "stream"
# Verify connections
npx claude-flow@alpha pipeline status --show-connections
Data Flow Mechanisms
Memory-Based:
# Agent 1 stores output
npx claude-flow@alpha memory store \
--key "pipeline/stage-1/output" \
--value "research findings..."
# Agent 2 retrieves input
npx claude-flow@alpha memory retrieve \
--key "pipeline/stage-1/output"
Stream-Based:
# Agent 1 streams output
npx claude-flow@alpha stream write \
--channel "stage-1-to-2" \
--data "streaming data..."
# Agent 2 consumes stream
npx claude-flow@alpha stream read \
--channel "stage-1-to-2"
Phase 3: Execute Pipeline
Objective
Execute pipeline with proper sequencing and data flow.
Scripts
# Execute sequential pipeline
npx claude-flow@alpha pipeline execute \
--design pipeline-design.json \
--input initial-data.json \
--strategy sequential
# Execute parallel pipeline
npx claude-flow@alpha pipeline execute \
--design pipeline-design.json \
--input initial-data.json \
--strategy parallel \
--max-parallelism 3
# Monitor execution
npx claude-flow@alpha pipeline monitor --interval 5
# Track stage progress
npx claude-flow@alpha pipeline stages --show-progress
Execution Strategies
Sequential:
- Stages execute one after another
- Output of stage N is input to stage N+1
- Simple error handling
- Predictable execution time
Parallel:
- Independent stages execute simultaneously
- Outputs merged at synchronization points
- Complex error handling
- Faster overall execution
Adaptive:
- Dynamically switches between sequential and parallel
- Based on stage dependencies and resource availability
- Optimizes for throughput
Phase 4: Monitor Streaming
Objective
Monitor data flow and pipeline execution in real-time.
Scripts
# Monitor data flow
npx claude-flow@alpha stream monitor \
--all-channels \
--interval 2 \
--output stream-metrics.json
# Track stage throughput
npx claude-flow@alpha pipeline metrics \
--metric throughput \
--per-stage
# Monitor backpressure
npx claude-flow@alpha stream backpressure --detect
# Generate flow report
npx claude-flow@alpha pipeline report \
--include-timing \
--include-throughput \
--output pipeline-report.md
Key Metrics
- Stage Throughput: Items processed per minute per stage
- Pipeline Latency: End-to-end processing time
- Backpressure: Queue buildup at stage boundaries
- Error Rate: Failures per stage
- Resource Utilization: CPU/memory per agent
Phase 5: Validate Results
Objective
Validate pipeline outputs and ensure data integrity.
Scripts
# Collect pipeline results
npx claude-flow@alpha pipeline results \
--output pipeline-results.json
# Validate data integrity
npx claude-flow@alpha pipeline validate \
--results pipeline-results.json \
--schema validation-schema.json
# Compare with expected output
npx claude-flow@alpha pipeline compare \
--actual pipeline-results.json \
--expected expected-output.json
# Generate validation report
npx claude-flow@alpha pipeline report \
--type validation \
--output validation-report.md
Success Criteria
- Pipeline design complete
- All stages connected
- Data flow functional
- Outputs validated
- Performance acceptable
Performance Targets
- Stage latency: <30 seconds average
- Pipeline throughput: ≥10 items/minute
- Error rate: <2%
- Data integrity: 100%
Best Practices
- Clear Stage Boundaries: Each stage has single responsibility
- Data Validation: Validate outputs before passing to next stage
- Error Handling: Implement retry and fallback mechanisms
- Backpressure Management: Prevent queue overflow
- Monitoring: Track metrics continuously
- State Management: Use memory coordination for state
- Testing: Test each stage independently
- Documentation: Document data schemas and flows
Common Issues & Solutions
Issue: Pipeline Stalls
Symptoms: Stages stop processing Solution: Check for backpressure, increase buffer sizes
Issue: Data Loss
Symptoms: Missing data in outputs Solution: Implement acknowledgment mechanism, use reliable channels
Issue: High Latency
Symptoms: Slow end-to-end processing Solution: Identify bottleneck stage, add parallelism
Integration Points
- swarm-orchestration: For complex multi-pipeline orchestration
- advanced-swarm: For optimized agent coordination
- performance-analysis: For bottleneck detection
References
- Pipeline Design Patterns
- Stream Processing Theory
- Data Flow Architectures
GitHub 仓库
相关推荐技能
sparc-methodology
开发SPARC Methodology为开发者提供了一套从需求分析到部署监控的17种开发模式,通过多智能体协作实现系统化软件开发。它支持TDD工作流和架构设计,适用于复杂项目的全生命周期管理。开发者可以按需调用特定模式来提升代码质量和开发效率。
n8n-workflow-testing-fundamentals
其他该Skill为开发者提供全面的n8n工作流测试指南,涵盖执行生命周期、节点连接模式和数据流验证等核心测试场景。它特别适用于测试n8n自动化应用时的错误处理策略验证和性能测量,包含结构化测试清单和最佳实践。关键特性包括预执行验证、真实数据测试以及节点间数据流检查的标准化流程。
when-orchestrating-swarm-use-swarm-orchestration
其他该技能用于复杂多智能体集群的协调管理,支持任务智能分解、分布式执行与结果综合。它适用于需要多个专业代理协同处理复杂工作流的场景,能生成任务分解树并提供性能指标。开发者可借此实现跨代理的高效协调与监控。
github-release-management
其他这个Skill为开发者提供基于AI集群协作的GitHub发布全流程自动化管理。它能自动处理版本管理、测试、部署和回滚等发布环节,特别适合需要协调多步骤发布的复杂CI/CD场景。通过集成GitHub CLI和可选的多AI协调工具,可以显著提升软件发布的效率和可靠性。
