Agent skill
orchestrating-agents
Orchestrates parallel API instances, delegated sub-tasks, and multi-agent workflows with streaming and tool-enabled delegation patterns. Use for parallel analysis, multi-perspective reviews, or complex task decomposition.
Install this agent skill to your Project
npx add-skill https://github.com/oaustegard/claude-skills/tree/main/orchestrating-agents
Metadata
Additional technical details for this skill
- version
- 0.3.0
SKILL.md
Orchestrating Agents
This skill enables programmatic API invocations for advanced workflows including parallel processing, task delegation, and multi-agent analysis using the Anthropic API.
When to Use This Skill
Primary use cases:
- Parallel sub-tasks: Break complex analysis into simultaneous independent streams
- Multi-perspective analysis: Get 3-5 different expert viewpoints concurrently
- Delegation: Offload specific subtasks to specialized API instances
- Recursive workflows: Orchestrator coordinating multiple API instances
- High-volume processing: Batch process multiple items concurrently
Trigger patterns:
- "Parallel analysis", "multi-perspective review", "concurrent processing"
- "Delegate subtasks", "coordinate multiple agents"
- "Run analyses from different perspectives"
- "Get expert opinions from multiple angles"
Quick Start
Single Invocation
import sys
sys.path.append('/home/user/claude-skills/orchestrating-agents/scripts')
from claude_client import invoke_claude
response = invoke_claude(
prompt="Analyze this code for security vulnerabilities: ...",
model="claude-sonnet-4-6"
)
print(response)
Parallel Multi-Perspective Analysis
from claude_client import invoke_parallel
prompts = [
{
"prompt": "Analyze from security perspective: ...",
"system": "You are a security expert"
},
{
"prompt": "Analyze from performance perspective: ...",
"system": "You are a performance optimization expert"
},
{
"prompt": "Analyze from maintainability perspective: ...",
"system": "You are a software architecture expert"
}
]
results = invoke_parallel(prompts, model="claude-sonnet-4-6")
for i, result in enumerate(results):
print(f"\n=== Perspective {i+1} ===")
print(result)
Parallel with Shared Cached Context (Recommended)
For parallel operations with shared base context, use caching to reduce costs by up to 90%:
from claude_client import invoke_parallel
# Large context shared across all sub-agents (e.g., codebase, documentation)
base_context = """
<codebase>
...large codebase or documentation (1000+ tokens)...
</codebase>
"""
prompts = [
{"prompt": "Find security vulnerabilities in the authentication module"},
{"prompt": "Identify performance bottlenecks in the API layer"},
{"prompt": "Suggest refactoring opportunities in the database layer"}
]
# First sub-agent creates cache, subsequent ones reuse it
results = invoke_parallel(
prompts,
shared_system=base_context,
cache_shared_system=True # 90% cost reduction for cached content
)
Multi-Turn Conversation with Auto-Caching
For sub-agents that need multiple rounds of conversation:
from claude_client import ConversationThread
# Create a conversation thread (auto-caches history)
agent = ConversationThread(
system="You are a code refactoring expert with access to the codebase",
cache_system=True
)
# Turn 1: Initial analysis
response1 = agent.send("Analyze the UserAuth class for issues")
print(response1)
# Turn 2: Follow-up (reuses cached system + turn 1)
response2 = agent.send("How would you refactor the login method?")
print(response2)
# Turn 3: Implementation (reuses all previous context)
response3 = agent.send("Show me the refactored code")
print(response3)
Streaming Responses
For real-time feedback from sub-agents:
from claude_client import invoke_claude_streaming
def show_progress(chunk):
print(chunk, end='', flush=True)
response = invoke_claude_streaming(
"Write a comprehensive security analysis...",
callback=show_progress
)
Parallel Streaming
Monitor multiple sub-agents simultaneously:
from claude_client import invoke_parallel_streaming
def agent1_callback(chunk):
print(f"[Security] {chunk}", end='', flush=True)
def agent2_callback(chunk):
print(f"[Performance] {chunk}", end='', flush=True)
results = invoke_parallel_streaming(
[
{"prompt": "Security review: ..."},
{"prompt": "Performance review: ..."}
],
callbacks=[agent1_callback, agent2_callback]
)
Interruptible Operations
Cancel long-running parallel operations:
from claude_client import invoke_parallel_interruptible, InterruptToken
import threading
import time
token = InterruptToken()
# Run in background
def run_analysis():
results = invoke_parallel_interruptible(
prompts=[...],
interrupt_token=token
)
return results
thread = threading.Thread(target=run_analysis)
thread.start()
# Interrupt after 5 seconds
time.sleep(5)
token.interrupt()
Core Functions
invoke_claude()
Single synchronous invocation with full control:
invoke_claude(
prompt: str | list[dict],
model: str = "claude-sonnet-4-6",
system: str | list[dict] | None = None,
max_tokens: int = 4096,
temperature: float = 1.0,
streaming: bool = False,
cache_system: bool = False,
cache_prompt: bool = False,
messages: list[dict] | None = None,
**kwargs
) -> str
Parameters:
prompt: The user message (string or list of content blocks)model: Claude model to use (default: claude-sonnet-4-6)system: Optional system prompt (string or list of content blocks)max_tokens: Maximum tokens in response (default: 4096)temperature: Randomness 0-1 (default: 1.0)streaming: Enable streaming response (default: False)cache_system: Add cache_control to system prompt (requires 1024+ tokens, default: False)cache_prompt: Add cache_control to user prompt (requires 1024+ tokens, default: False)messages: Pre-built messages list for multi-turn (overrides prompt)**kwargs: Additional API parameters (top_p, top_k, etc.)
Returns: Response text as string
Note: Caching requires minimum 1,024 tokens per cache breakpoint. Cache lifetime is 5 minutes (refreshed on use).
invoke_parallel()
Concurrent invocations using lightweight workflow pattern:
invoke_parallel(
prompts: list[dict],
model: str = "claude-sonnet-4-6",
max_tokens: int = 4096,
max_workers: int = 5,
shared_system: str | list[dict] | None = None,
cache_shared_system: bool = False
) -> list[str]
Parameters:
prompts: List of dicts with 'prompt' (required) and optional 'system', 'temperature', 'cache_system', 'cache_prompt', etc.model: Claude model for all invocationsmax_tokens: Max tokens per responsemax_workers: Max concurrent API calls (default: 5, max: 10)shared_system: System context shared across ALL invocations (for cache efficiency)cache_shared_system: Add cache_control to shared_system (default: False)
Returns: List of response strings in same order as prompts
Note: For optimal cost savings, put large common context (1024+ tokens) in shared_system with cache_shared_system=True. First invocation creates cache, subsequent ones reuse it (90% cost reduction).
invoke_claude_streaming()
Stream responses in real-time with optional callbacks:
invoke_claude_streaming(
prompt: str | list[dict],
callback: callable = None,
model: str = "claude-sonnet-4-6",
system: str | list[dict] | None = None,
max_tokens: int = 4096,
temperature: float = 1.0,
cache_system: bool = False,
cache_prompt: bool = False,
**kwargs
) -> str
Parameters:
callback: Optional function called with each text chunk (str) as it arrives- (other parameters same as invoke_claude)
Returns: Complete accumulated response text
invoke_parallel_streaming()
Parallel invocations with per-agent streaming callbacks:
invoke_parallel_streaming(
prompts: list[dict],
callbacks: list[callable] = None,
model: str = "claude-sonnet-4-6",
max_tokens: int = 4096,
max_workers: int = 5,
shared_system: str | list[dict] | None = None,
cache_shared_system: bool = False
) -> list[str]
Parameters:
callbacks: Optional list of callback functions, one per prompt- (other parameters same as invoke_parallel)
invoke_parallel_interruptible()
Parallel invocations with cancellation support:
invoke_parallel_interruptible(
prompts: list[dict],
interrupt_token: InterruptToken = None,
# ... same other parameters as invoke_parallel
) -> list[str]
Parameters:
interrupt_token: Optional InterruptToken to signal cancellation- (other parameters same as invoke_parallel)
Returns: List of response strings (None for interrupted tasks)
ConversationThread
Manages multi-turn conversations with automatic caching:
thread = ConversationThread(
system: str | list[dict] | None = None,
model: str = "claude-sonnet-4-6",
max_tokens: int = 4096,
temperature: float = 1.0,
cache_system: bool = True
)
response = thread.send(
user_message: str | list[dict],
cache_history: bool = True
) -> str
Methods:
send(message, cache_history=True): Send message and get responseget_messages(): Get conversation historyclear(): Clear conversation history__len__(): Get number of turns
New in 0.3.0:
turn_countproperty: Number of completed turn pairssend_continuation(guidance, cache_history): Lightweight continuation turn (requires priorsend())max_turnsconstructor parameter: Optional turn limitcontinuation_promptconstructor parameter: Default continuation guidance
StallDetector
Monitors activity timestamps and detects unresponsive operations:
from claude_client import StallDetector
def handle_stall(task_id, idle_seconds):
print(f"Task {task_id} stalled for {idle_seconds:.1f}s")
detector = StallDetector(timeout=60.0, on_stall=handle_stall)
detector.register("task-1")
detector.start_monitoring(poll_interval=5.0)
# Call heartbeat() during streaming/progress
detector.heartbeat("task-1")
# When done
detector.unregister("task-1")
detector.stop_monitoring()
TaskTracker (task_state module)
Formal task lifecycle state machine with enforced transitions:
from task_state import TaskTracker, TaskState
tracker = TaskTracker(max_retries=3)
tracker.add("task-1", category="security")
tracker.claim("task-1") # UNCLAIMED → CLAIMED
tracker.start("task-1") # CLAIMED → RUNNING (increments attempt)
tracker.complete("task-1") # RUNNING → COMPLETED
# On failure with retry:
tracker.fail("task-2", error="timeout")
tracker.retry("task-2") # FAILED → RETRY_QUEUED (if under max_retries)
tracker.claim("task-2") # RETRY_QUEUED → CLAIMED
# Query state
tracker.active_count(category="security")
tracker.get_by_state(TaskState.RUNNING)
tracker.summary() # {"completed": 1, "running": 1, ...}
invoke_with_retry() (orchestration module)
Single invocation with exponential backoff:
from orchestration import invoke_with_retry
response = invoke_with_retry(
"Analyze this code...",
max_retries=3,
base_delay_ms=1000, # 1s, 2s, 4s backoff
max_delay_ms=10000, # capped at 10s
)
invoke_parallel_managed() (orchestration module)
Full-featured parallel invocations with all Symphony patterns:
from orchestration import invoke_parallel_managed, ConcurrencyLimiter
limiter = ConcurrencyLimiter(
global_limit=10,
category_limits={"security": 3, "perf": 3}
)
def reconcile(prompts, tracker):
# Filter out invalid/duplicate work before dispatch
return [p for p in prompts if should_run(p)]
results = invoke_parallel_managed(
prompts=[
{"prompt": "Security review...", "task_id": "sec-1", "category": "security"},
{"prompt": "Perf review...", "task_id": "perf-1", "category": "perf"},
],
reconcile=reconcile,
concurrency_limiter=limiter,
max_retries=3,
stall_timeout=60.0,
on_stall=lambda tid, idle: print(f"{tid} stalled"),
)
Example Workflows
See references/workflows.md for detailed examples including:
- Multi-expert code review
- Parallel document analysis
- Recursive task delegation
- Advanced Agent SDK delegation patterns
- Prompt caching workflows
Setup
Prerequisites:
-
Install anthropic library:
bashuv pip install anthropic -
Configure API key via project knowledge file:
Option 1 (recommended): Individual file
- Create document:
ANTHROPIC_API_KEY.txt - Content: Your API key (e.g.,
sk-ant-api03-...)
Option 2: Combined file
- Create document:
API_CREDENTIALS.json - Content:
json
{ "anthropic_api_key": "sk-ant-api03-..." }
Get your API key: https://console.anthropic.com/settings/keys
- Create document:
Installation check:
python3 -c "import anthropic; print(f'✓ anthropic {anthropic.__version__}')"
Error Handling
The module provides comprehensive error handling:
from claude_client import invoke_claude, ClaudeInvocationError
try:
response = invoke_claude("Your prompt here")
except ClaudeInvocationError as e:
print(f"API Error: {e}")
print(f"Status: {e.status_code}")
print(f"Details: {e.details}")
except ValueError as e:
print(f"Configuration Error: {e}")
Common errors:
- API key missing: Add ANTHROPIC_API_KEY.txt to project knowledge (see Setup above)
- Rate limits: Reduce max_workers or add delays
- Token limits: Reduce prompt size or max_tokens
- Network errors: Automatic retry with exponential backoff
Prompt Caching
For detailed caching workflows and best practices, see references/workflows.md.
Performance Considerations
Token efficiency:
- Parallel calls use more tokens but save wall-clock time
- Use prompt caching for shared context (90% cost reduction)
- Use concise system prompts to reduce overhead
- Consider token budgets when setting max_tokens
Rate limits:
- Anthropic API has per-minute rate limits
- Default max_workers=5 is safe for most tiers
- Adjust based on your API tier and rate limits
Cost management:
- Each invocation consumes API credits
- Monitor usage in Anthropic Console
- Use smaller models (haiku) for simple tasks
- Use prompt caching for repeated context (90% savings)
- Cache lifetime: 5 minutes, refreshed on each use
Best Practices
-
Use parallel invocations for independent tasks only
- Don't parallelize sequential dependencies
- Each parallel task should be self-contained
-
Set appropriate system prompts
- Define clear roles/expertise for each instance
- Keeps responses focused and relevant
-
Handle errors gracefully
- Always wrap invocations in try-except
- Provide fallback behavior for failures
-
Test with small batches first
- Verify prompts work before scaling
- Check token usage and costs
-
Consider alternatives
- Not all tasks benefit from multiple instances
- Sometimes sequential with context is better
Token Efficiency
This skill uses ~800 tokens when loaded but enables powerful multi-agent patterns that can dramatically improve complex analysis quality and speed.
See Also
- references/api-reference.md - Detailed API documentation
- Anthropic API Docs - Official documentation
Recommended Agent Skills
Expand your agent's capabilities with these related and highly-rated skills.
hello-demo
Delivers a static Hello World HTML demo page with bookmarklet. Use when user requests the hello demo, hello world demo, or demo page.
installing-skills
Install skills from github.com/oaustegard/claude-skills into /mnt/skills/user. Use when user mentions "install skills", "load skills", "add skills", "update skills", "refresh skills", or references a skill not currently installed.
extracting-keywords
Extract keywords from documents using YAKE algorithm with support for 34 languages (Arabic to Chinese). Use when users request keyword extraction, key terms, topic identification, content summarization, or document analysis. Includes domain-specific stopwords for AI/ML and life sciences. Optional deeper extraction mode (n=2+n=3 combined) for comprehensive coverage.
remembering
Advanced memory operations reference. Basic patterns (profile loading, simple recall/remember) are in project instructions. Consult this skill for background writes, memory versioning, complex queries, edge cases, session scoping, retention management, type-safe results, proactive memory hints, GitHub access detection, autonomous curation, episodic scoring, and decision traces.
check-tools
Validates development tool installations across Python, Node.js, Java, Go, Rust, C/C++, Git, and system utilities. Use when verifying environments or troubleshooting dependencies.
forecasting-reverso
Zero-shot univariate time series forecasting using the Reverso foundation model (NumPy/Numba CPU-only inference). Activate when users provide time series data and request forecasts, predictions, or extrapolations. Supports Reverso Small (550K params). Triggers on "forecast", "predict", "time series", "Reverso", or when tabular data with a temporal dimension needs future-value estimation.
Didn't find tool you were looking for?