Agent skill
python-asyncio
Install this agent skill to your Project
npx add-skill https://github.com/majiayu000/claude-skill-registry/tree/main/skills/data/python-asyncio
SKILL.md
Quick Reference
| Function | Purpose | Code |
|---|---|---|
asyncio.run() |
Entry point | asyncio.run(main()) |
asyncio.gather() |
Concurrent tasks | await asyncio.gather(*tasks) |
asyncio.create_task() |
Fire-and-await | task = asyncio.create_task(coro) |
asyncio.TaskGroup() |
Structured concurrency | async with asyncio.TaskGroup() as tg: |
asyncio.Semaphore() |
Rate limiting | async with semaphore: |
asyncio.timeout() |
Timeout (3.11+) | async with asyncio.timeout(5.0): |
| Pattern | Sequential | Concurrent |
|---|---|---|
| Execution | await a(); await b() |
await gather(a(), b()) |
| Time | Sum of durations | Max of durations |
| Library | Use Case |
|---|---|
aiohttp |
HTTP client (most popular) |
httpx |
HTTP (sync + async) |
asyncpg |
PostgreSQL |
uvloop |
2-4x faster event loop |
When to Use This Skill
Use for async/concurrent programming:
- Network requests (HTTP, WebSockets)
- Database queries
- File I/O operations
- Multiple concurrent I/O operations
- FastAPI/Starlette async endpoints
Related skills:
- For FastAPI: see
python-fastapi - For type hints: see
python-type-hints - For gotchas: see
python-gotchas
Python Asyncio Complete Guide
Overview
Asyncio is Python's built-in framework for writing concurrent code using async/await syntax. It's ideal for I/O-bound operations like network requests, file I/O, and database queries.
When to Use Asyncio
Good Use Cases
- Network requests (HTTP clients, WebSockets)
- Database queries
- File I/O operations
- Web servers (FastAPI, Starlette)
- Message queues
- Multiple concurrent I/O operations
When NOT to Use
- CPU-bound tasks (use multiprocessing instead)
- Simple sequential scripts
- Legacy codebases without async support
Core Concepts
Basic Async/Await
import asyncio
# Async function (coroutine)
async def fetch_data(url: str) -> dict:
# Simulated async I/O
await asyncio.sleep(1)
return {"url": url, "data": "..."}
# Running a coroutine
async def main():
result = await fetch_data("https://api.example.com")
print(result)
# Entry point
asyncio.run(main())
Concurrent Execution with gather()
import asyncio
import aiohttp
async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
async with session.get(url) as response:
return {"url": url, "status": response.status}
async def fetch_all(urls: list[str]) -> list[dict]:
async with aiohttp.ClientSession() as session:
# Run all requests concurrently
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# Usage
urls = [
"https://api.example.com/1",
"https://api.example.com/2",
"https://api.example.com/3",
]
results = asyncio.run(fetch_all(urls))
TaskGroup (Python 3.11+)
import asyncio
async def process_item(item: str) -> str:
await asyncio.sleep(1)
return f"Processed: {item}"
async def process_all(items: list[str]) -> list[str]:
results = []
# TaskGroup provides structured concurrency
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(process_item(item)) for item in items]
# All tasks complete when exiting the context
return [task.result() for task in tasks]
# Exception handling with TaskGroup
async def process_with_errors(items: list[str]):
try:
async with asyncio.TaskGroup() as tg:
for item in items:
tg.create_task(process_item(item))
except* ValueError as eg:
# Handle ValueError exceptions
for exc in eg.exceptions:
print(f"ValueError: {exc}")
except* TypeError as eg:
# Handle TypeError exceptions
for exc in eg.exceptions:
print(f"TypeError: {exc}")
create_task() vs await
import asyncio
async def task_a():
await asyncio.sleep(2)
return "A done"
async def task_b():
await asyncio.sleep(1)
return "B done"
# Sequential (slower - 3 seconds total)
async def sequential():
result_a = await task_a() # Wait 2 seconds
result_b = await task_b() # Wait 1 second
return result_a, result_b
# Concurrent (faster - 2 seconds total)
async def concurrent():
task1 = asyncio.create_task(task_a()) # Start immediately
task2 = asyncio.create_task(task_b()) # Start immediately
result_a = await task1
result_b = await task2
return result_a, result_b
# Using gather (recommended for multiple tasks)
async def concurrent_gather():
result_a, result_b = await asyncio.gather(task_a(), task_b())
return result_a, result_b
Advanced Patterns
Semaphores for Rate Limiting
import asyncio
import aiohttp
async def fetch_with_limit(
session: aiohttp.ClientSession,
url: str,
semaphore: asyncio.Semaphore
) -> dict:
async with semaphore: # Limits concurrent requests
async with session.get(url) as response:
return await response.json()
async def fetch_many(urls: list[str], max_concurrent: int = 10) -> list[dict]:
semaphore = asyncio.Semaphore(max_concurrent)
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_limit(session, url, semaphore) for url in urls]
return await asyncio.gather(*tasks)
Timeouts
import asyncio
async def slow_operation():
await asyncio.sleep(10)
return "done"
async def with_timeout():
try:
# Wait at most 5 seconds
result = await asyncio.wait_for(slow_operation(), timeout=5.0)
return result
except asyncio.TimeoutError:
print("Operation timed out")
return None
# Using timeout context manager (Python 3.11+)
async def with_timeout_context():
async with asyncio.timeout(5.0):
result = await slow_operation()
return result
Queues for Producer-Consumer
import asyncio
from typing import Any
async def producer(queue: asyncio.Queue, items: list[Any]):
for item in items:
await queue.put(item)
print(f"Produced: {item}")
# Signal completion
await queue.put(None)
async def consumer(queue: asyncio.Queue, consumer_id: int):
while True:
item = await queue.get()
if item is None:
# Put sentinel back for other consumers
await queue.put(None)
break
print(f"Consumer {consumer_id} processing: {item}")
await asyncio.sleep(0.5) # Simulate work
queue.task_done()
async def main():
queue: asyncio.Queue = asyncio.Queue(maxsize=10)
items = list(range(20))
# Start producer and consumers
async with asyncio.TaskGroup() as tg:
tg.create_task(producer(queue, items))
for i in range(3):
tg.create_task(consumer(queue, i))
asyncio.run(main())
Async Generators
import asyncio
from typing import AsyncIterator
async def fetch_pages(url: str, max_pages: int = 10) -> AsyncIterator[dict]:
"""Async generator for paginated API."""
page = 1
while page <= max_pages:
# Simulate API call
await asyncio.sleep(0.1)
data = {"page": page, "items": [f"item_{i}" for i in range(10)]}
if not data["items"]:
break
yield data
page += 1
async def process_pages():
async for page_data in fetch_pages("https://api.example.com"):
print(f"Processing page {page_data['page']}")
for item in page_data["items"]:
process(item)
Async Context Managers
from contextlib import asynccontextmanager
from typing import AsyncIterator
class AsyncDatabaseConnection:
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.disconnect()
return False
async def connect(self):
print("Connecting...")
await asyncio.sleep(0.1)
async def disconnect(self):
print("Disconnecting...")
await asyncio.sleep(0.1)
# Using decorator
@asynccontextmanager
async def async_session() -> AsyncIterator[dict]:
session = {"connected": True}
try:
yield session
finally:
session["connected"] = False
await asyncio.sleep(0.1) # Cleanup
# Usage
async def main():
async with AsyncDatabaseConnection() as db:
await db.query("SELECT * FROM users")
async with async_session() as session:
print(session)
Performance Optimization
Eager Task Factory (Python 3.12+)
import asyncio
async def cached_operation(key: str) -> str:
cache = {"a": "value_a", "b": "value_b"}
if key in cache:
return cache[key] # Returns synchronously
await asyncio.sleep(1) # Only if cache miss
return f"fetched_{key}"
async def main():
loop = asyncio.get_event_loop()
# Enable eager task execution
loop.set_task_factory(asyncio.eager_task_factory)
# Cached operations complete synchronously without event loop overhead
result = await cached_operation("a")
Using uvloop
# Install: pip install uvloop
import asyncio
try:
import uvloop
# 2-4x performance improvement
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except ImportError:
pass # Fall back to default event loop
async def main():
# Your async code here
pass
asyncio.run(main())
Free-Threaded asyncio (Python 3.14+)
# Python 3.14 improvements for free-threaded builds:
# - Thread-safe asyncio with lock-free data structures
# - Linear scaling with number of threads
# - 10-20% single-threaded performance improvement
# - Reduced memory usage
import asyncio
import threading
async def per_thread_loop():
"""Each thread can run its own event loop."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
await asyncio.sleep(1)
finally:
loop.close()
# Multiple event loops in parallel (free-threaded build)
threads = [
threading.Thread(target=lambda: asyncio.run(per_thread_loop()))
for _ in range(4)
]
for t in threads:
t.start()
for t in threads:
t.join()
Common Gotchas
Blocking the Event Loop
import asyncio
import time
# BAD: Blocks the event loop
async def bad_example():
time.sleep(5) # Blocks everything!
return "done"
# GOOD: Use async sleep
async def good_example():
await asyncio.sleep(5)
return "done"
# GOOD: Run blocking code in executor
async def blocking_in_executor():
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, time.sleep, 5)
return result
# For CPU-bound work, use ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor
async def cpu_bound_work(data: list) -> list:
loop = asyncio.get_event_loop()
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, heavy_computation, data)
return result
Creating Tasks in Wrong Context
import asyncio
# BAD: Task created outside async context
# task = asyncio.create_task(some_coroutine()) # RuntimeError!
# GOOD: Create tasks inside async function
async def main():
task = asyncio.create_task(some_coroutine())
await task
asyncio.run(main())
Forgetting to Await
import asyncio
async def fetch():
await asyncio.sleep(1)
return "data"
# BAD: Coroutine never executed
async def bad():
result = fetch() # Just creates coroutine object!
print(result) # Prints coroutine object, not "data"
# GOOD: Always await coroutines
async def good():
result = await fetch()
print(result) # Prints "data"
Exception Handling in Tasks
import asyncio
async def failing_task():
await asyncio.sleep(1)
raise ValueError("Task failed!")
async def main():
# BAD: Exception silently lost
task = asyncio.create_task(failing_task())
await asyncio.sleep(2) # Task exception ignored
# GOOD: Always await tasks or use TaskGroup
task = asyncio.create_task(failing_task())
try:
await task
except ValueError as e:
print(f"Caught: {e}")
# BEST: Use TaskGroup (Python 3.11+)
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(failing_task())
except* ValueError as eg:
for exc in eg.exceptions:
print(f"Caught: {exc}")
Async Libraries
HTTP Clients
# aiohttp - Most popular
import aiohttp
async def fetch_aiohttp(url: str) -> dict:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
# httpx - Supports both sync and async
import httpx
async def fetch_httpx(url: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.json()
Database
# asyncpg - PostgreSQL
import asyncpg
async def query_postgres():
conn = await asyncpg.connect("postgresql://user:pass@localhost/db")
rows = await conn.fetch("SELECT * FROM users")
await conn.close()
return rows
# aiosqlite - SQLite
import aiosqlite
async def query_sqlite():
async with aiosqlite.connect("database.db") as db:
async with db.execute("SELECT * FROM users") as cursor:
return await cursor.fetchall()
Web Frameworks
# FastAPI (built on Starlette)
from fastapi import FastAPI
app = FastAPI()
@app.get("/items/{item_id}")
async def read_item(item_id: int):
return {"item_id": item_id}
# Starlette
from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Route
async def homepage(request):
return JSONResponse({"hello": "world"})
app = Starlette(routes=[Route("/", homepage)])
Additional References
For production-ready patterns beyond this guide, see:
- Async Patterns Library - Token bucket rate limiter, retry with exponential backoff, connection pools, batch processors, event bus, transaction context managers, async cache with TTL, graceful shutdown handlers
Recommended Agent Skills
Expand your agent's capabilities with these related and highly-rated skills.
agent-ops-spec
Manage specification documents in .agent/specs/. Use when user provides requirements, acceptance criteria, or feature descriptions that need to be tracked and validated against implementation.
agent-ops-state
Maintain .agent state files. Use at session start, after meaningful steps, and before concluding: read/update constitution/memory/focus/issues/baseline consistently.
agent-ops-spec
Manage specification documents in .agent/specs/. Use when user provides requirements, acceptance criteria, or feature descriptions that need to be tracked and validated against implementation.
agent-ops-testing
Test strategy, execution, and coverage analysis. Use when designing tests, running test suites, or analyzing test results beyond baseline checks.
agent-ops-testing
Test strategy, execution, and coverage analysis. Use when designing tests, running test suites, or analyzing test results beyond baseline checks.
agent-ops-state
Maintain .agent state files. Use at session start, after meaningful steps, and before concluding: read/update constitution/memory/focus/issues/baseline consistently.
Didn't find tool you were looking for?