Agent skill

api-dev

Modern API development patterns for building high-performance, scalable web services. Expert in async/await patterns, REST/GraphQL APIs, middleware, error handling, rate limiting, OpenAPI documentation, testing, and production optimizations. Framework-agnostic patterns that work with Python, Node.js, Go, and other languages.

Stars 232
Forks 15

Install this agent skill to your Project

npx add-skill https://github.com/aiskillstore/marketplace/tree/main/skills/azeem-2/api-dev

SKILL.md

API Development Patterns & Best Practices

This skill provides comprehensive patterns for building modern APIs in 2025, focusing on async/await patterns, performance optimization, security, testing, and production-ready configurations that work across different frameworks and languages.

When to Use This Skill

Use this skill when you need to:

  • Design RESTful or GraphQL APIs
  • Implement async/await patterns for high performance
  • Add middleware for authentication, logging, and validation
  • Handle errors gracefully with proper HTTP status codes
  • Implement rate limiting and throttling
  • Generate OpenAPI/Swagger documentation
  • Set up comprehensive testing strategies
  • Optimize API performance with caching and connection pooling
  • Implement API versioning and backward compatibility
  • Set up monitoring and observability

Core API Design Principles

1. Async/Await Patterns for Performance

python
# patterns/async_patterns.py
import asyncio
import aiohttp
import aioredis
from typing import AsyncGenerator, Optional, List, Dict, Any
from contextlib import asynccontextmanager
from dataclasses import dataclass
from functools import wraps
import time

@dataclass
class RequestMetrics:
    """Request metrics for monitoring"""
    duration: float
    status_code: int
    endpoint: str
    method: str
    user_id: Optional[str] = None

def with_metrics(func):
    """Decorator to add request metrics"""
    @wraps(func)
    async def wrapper(*args, **kwargs):
        start_time = time.time()
        status_code = 200
        endpoint = func.__name__
        method = kwargs.get('method', 'GET')

        try:
            result = await func(*args, **kwargs)
            if hasattr(result, 'status_code'):
                status_code = result.status_code
            return result
        except Exception as e:
            status_code = getattr(e, 'status_code', 500)
            raise
        finally:
            duration = time.time() - start_time
            metrics = RequestMetrics(
                duration=duration,
                status_code=status_code,
                endpoint=endpoint,
                method=method
            )
            # Send metrics to monitoring system
            await send_metrics(metrics)

    return wrapper

async def send_metrics(metrics: RequestMetrics):
    """Send metrics to monitoring system"""
    # Implementation depends on your monitoring system
    # Example: Prometheus, Datadog, or custom analytics
    pass

class AsyncAPIClient:
    """Generic async API client with connection pooling"""

    def __init__(self, base_url: str, timeout: int = 30):
        self.base_url = base_url
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self._session: Optional[aiohttp.ClientSession] = None
        self._session_lock = asyncio.Lock()

    async def _get_session(self) -> aiohttp.ClientSession:
        """Get or create session with connection pooling"""
        if self._session is None or self._session.closed:
            async with self._session_lock:
                if self._session is None or self._session.closed:
                    connector = aiohttp.TCPConnector(
                        limit=100,  # Total connection pool size
                        limit_per_host=30,  # Connections per host
                        force_close=False,
                        enable_cleanup_closed=True
                    )
                    self._session = aiohttp.ClientSession(
                        connector=connector,
                        timeout=self.timeout
                    )
        return self._session

    @with_metrics
    async def get(
        self,
        endpoint: str,
        params: Optional[Dict[str, Any]] = None,
        headers: Optional[Dict[str, str]] = None
    ) -> Dict[str, Any]:
        """Make GET request with retry logic"""
        session = await self._get_session()
        url = f"{self.base_url}{endpoint}"

        async with session.get(
            url,
            params=params,
            headers=headers
        ) as response:
            response.raise_for_status()
            return await response.json()

    @with_metrics
    async def post(
        self,
        endpoint: str,
        data: Optional[Dict[str, Any]] = None,
        json: Optional[Dict[str, Any]] = None,
        headers: Optional[Dict[str, str]] = None
    ) -> Dict[str, Any]:
        """Make POST request with retry logic"""
        session = await self._get_session()
        url = f"{self.base_url}{endpoint}"

        async with session.post(
            url,
            data=data,
            json=json,
            headers=headers
        ) as response:
            response.raise_for_status()
            return await response.json()

    async def close(self):
        """Close the session"""
        if self._session:
            await self._session.close()

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.close()

2. Circuit Breaker Pattern

python
# patterns/circuit_breaker.py
import asyncio
import time
from enum import Enum
from typing import Callable, Any, Optional
from functools import wraps

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    """Circuit breaker for fault tolerance"""

    def __init__(
        self,
        failure_threshold: int = 5,
        timeout: int = 60,
        expected_exception: Exception = Exception
    ):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.expected_exception = expected_exception
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED

    def __call__(self, func: Callable) -> Callable:
        @wraps(func)
        async def wrapper(*args, **kwargs):
            if self.state == CircuitState.OPEN:
                if self._should_attempt_reset():
                    self.state = CircuitState.HALF_OPEN
                else:
                    raise Exception("Circuit breaker is OPEN")

            try:
                result = await func(*args, **kwargs)
                if self.state == CircuitState.HALF_OPEN:
                    self._reset()
                return result
            except self.expected_exception as e:
                self._record_failure()
                raise

        return wrapper

    def _should_attempt_reset(self) -> bool:
        return time.time() - self.last_failure_time >= self.timeout

    def _record_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

    def _reset(self):
        self.failure_count = 0
        self.state = CircuitState.CLOSED

# Usage example
@circuit_breaker(failure_threshold=3, timeout=30)
async def external_api_call():
    """External API call with circuit breaker"""
    async with aiohttp.ClientSession() as session:
        async with session.get("https://api.example.com") as response:
            return await response.json()

3. Rate Limiting with Redis

python
# patterns/rate_limiting.py
import asyncio
import aioredis
import time
from typing import Optional
from fastapi import HTTPException, Request
from starlette.middleware.base import BaseHTTPMiddleware

class RateLimiter:
    """Rate limiter using Redis sliding window"""

    def __init__(
        self,
        redis_url: str,
        requests_per_minute: int = 100,
        window_size: int = 60
    ):
        self.redis = None
        self.redis_url = redis_url
        self.requests_per_minute = requests_per_minute
        self.window_size = window_size

    async def initialize(self):
        """Initialize Redis connection"""
        self.redis = await aioredis.from_url(self.redis_url)

    async def is_allowed(
        self,
        key: str,
        limit: Optional[int] = None,
        window: Optional[int] = None
    ) -> bool:
        """Check if request is allowed"""
        if not self.redis:
            await self.initialize()

        limit = limit or self.requests_per_minute
        window = window or self.window_size
        current_time = time.time()

        # Remove old requests from the sliding window
        await self.redis.zremrangebyscore(
            key,
            0,
            current_time - window
        )

        # Count current requests in window
        current_requests = await self.redis.zcard(key)

        if current_requests >= limit:
            return False

        # Add current request to window
        await self.redis.zadd(key, {str(current_time): current_time})
        await self.redis.expire(key, window)

        return True

class RateLimitMiddleware(BaseHTTPMiddleware):
    """FastAPI middleware for rate limiting"""

    def __init__(
        self,
        app,
        redis_url: str,
        requests_per_minute: int = 100,
        identifier_func=None
    ):
        super().__init__(app)
        self.limiter = RateLimiter(redis_url, requests_per_minute)
        self.identifier_func = identifier_func or self._default_identifier

    def _default_identifier(self, request: Request) -> str:
        """Default identifier function"""
        # Use IP address as identifier
        return request.client.host

    async def dispatch(self, request: Request, call_next):
        identifier = self.identifier_func(request)
        key = f"rate_limit:{identifier}:{request.url.path}"

        if not await self.limiter.is_allowed(key):
            raise HTTPException(
                status_code=429,
                detail="Rate limit exceeded"
            )

        return await call_next(request)

4. API Versioning Strategy

python
# patterns/versioning.py
from enum import Enum
from typing import Optional, Dict, Any, Type
from abc import ABC, abstractmethod

class APIVersion(str, Enum):
    V1 = "v1"
    V2 = "v2"
    V3 = "v3"

class APIVersionStrategy(ABC):
    """Base class for API versioning strategies"""

    @abstractmethod
    def get_version_from_request(self, request) -> Optional[APIVersion]:
        """Extract version from request"""
        pass

class HeaderVersionStrategy(APIVersionStrategy):
    """Version from header strategy"""

    def __init__(self, header_name: str = "API-Version"):
        self.header_name = header_name

    def get_version_from_request(self, request) -> Optional[APIVersion]:
        version = request.headers.get(self.header_name)
        return APIVersion(version) if version in APIVersion.__members__ else None

class URLPathVersionStrategy(APIVersionStrategy):
    """Version from URL path strategy"""

    def __init__(self, prefix: str = "/api"):
        self.prefix = prefix

    def get_version_from_request(self, request) -> Optional[APIVersion]:
        path = request.url.path
        if not path.startswith(self.prefix):
            return None

        version_part = path[len(self.prefix):].split("/")[1]
        return APIVersion(version_part) if version_part in APIVersion.__members__ else None

class QueryParamVersionStrategy(APIVersionStrategy):
    """Version from query parameter strategy"""

    def __init__(self, param_name: str = "version"):
        self.param_name = param_name

    def get_version_from_request(self, request) -> Optional[APIVersion]:
        version = request.query_params.get(self.param_name)
        return APIVersion(version) if version in APIVersion.__members__ else None

class CompositeVersionStrategy(APIVersionStrategy):
    """Composite version strategy that tries multiple strategies"""

    def __init__(self, strategies: list[APIVersionStrategy]):
        self.strategies = strategies
        self.default_version = APIVersion.V1

    def get_version_from_request(self, request) -> Optional[APIVersion]:
        for strategy in self.strategies:
            version = strategy.get_version_from_request(request)
            if version:
                return version
        return self.default_version

# Version-specific handlers
class APIHandlerRegistry:
    """Registry for version-specific API handlers"""

    def __init__(self):
        self.handlers: Dict[APIVersion, Dict[str, Any]] = {
            version: {} for version in APIVersion
        }

    def register_handler(
        self,
        version: APIVersion,
        endpoint: str,
        handler: Any
    ):
        """Register handler for specific version"""
        if version not in self.handlers:
            self.handlers[version] = {}
        self.handlers[version][endpoint] = handler

    def get_handler(self, version: APIVersion, endpoint: str) -> Optional[Any]:
        """Get handler for version and endpoint"""
        return self.handlers.get(version, {}).get(endpoint)

# Usage example
version_strategy = CompositeVersionStrategy([
    HeaderVersionStrategy(),
    URLPathVersionStrategy(),
    QueryParamVersionStrategy()
])

5. OpenAPI Documentation Enhancement

python
# patterns/openapi.py
from typing import Dict, Any, List, Optional
from pydantic import BaseModel, Field
from datetime import datetime
from enum import Enum

class ErrorSchema(BaseModel):
    """Standard error response schema"""
    error: str = Field(..., description="Error type")
    message: str = Field(..., description="Error message")
    details: Optional[Dict[str, Any]] = Field(
        None,
        description="Additional error details"
    )
    timestamp: datetime = Field(
        default_factory=datetime.utcnow,
        description="Error timestamp"
    )

class PaginationLinks(BaseModel):
    """Pagination links schema"""
    first: Optional[str] = Field(None, description="First page link")
    last: Optional[str] = Field(None, description="Last page link")
    next: Optional[str] = Field(None, description="Next page link")
    prev: Optional[str] = Field(None, description="Previous page link")

class PaginationMeta(BaseModel):
    """Pagination metadata schema"""
    total: int = Field(..., description="Total number of items")
    page: int = Field(..., description="Current page number")
    per_page: int = Field(..., description="Items per page")
    pages: int = Field(..., description="Total number of pages")

class PaginatedResponse(BaseModel):
    """Generic paginated response schema"""
    data: List[Any] = Field(..., description="Response data")
    meta: PaginationMeta = Field(..., description="Pagination metadata")
    links: PaginationLinks = Field(..., description="Pagination links")

# Enhanced OpenAPI configuration
OPENAPI_CONFIG = {
    "title": "Modern API",
    "description": "A modern REST API with async patterns",
    "version": "1.0.0",
    "docs_url": "/docs",
    "redoc_url": "/redoc",
    "openapi_url": "/openapi.json",
    "servers": [
        {
            "url": "https://api.example.com/v1",
            "description": "Production server"
        },
        {
            "url": "https://staging-api.example.com/v1",
            "description": "Staging server"
        }
    ],
    "components": {
        "schemas": {
            "Error": ErrorSchema.model_json_schema(),
            "PaginatedResponse": PaginatedResponse.model_json_schema()
        },
        "responses": {
            "ValidationError": {
                "description": "Validation error",
                "content": {
                    "application/json": {
                        "schema": ErrorSchema.model_json_schema()
                    }
                }
            },
            "UnauthorizedError": {
                "description": "Unauthorized error",
                "content": {
                    "application/json": {
                        "schema": ErrorSchema.model_json_schema()
                    }
                }
            },
            "NotFoundError": {
                "description": "Resource not found",
                "content": {
                    "application/json": {
                        "schema": ErrorSchema.model_json_schema()
                    }
                }
            },
            "RateLimitError": {
                "description": "Rate limit exceeded",
                "content": {
                    "application/json": {
                        "schema": ErrorSchema.model_json_schema()
                    }
                }
            }
        }
    }
}

6. Testing Patterns for APIs

python
# patterns/testing.py
import pytest
import asyncio
from typing import AsyncGenerator
import httpx
from fastapi.testclient import TestClient
from unittest.mock import AsyncMock, patch

class AsyncAPITestCase:
    """Base class for async API tests"""

    @pytest.fixture(scope="class")
    async def async_client(self) -> AsyncGenerator[httpx.AsyncClient, None]:
        """Create async test client"""
        async with httpx.AsyncClient(
            app=self.app,
            base_url="http://test"
        ) as client:
            yield client

    @pytest.fixture
    def mock_external_service(self):
        """Mock external service"""
        with patch("external_api_client.ExternalAPIClient") as mock:
            client = mock.return_value
            client.get.return_value = {"status": "ok"}
            yield client

# Example test cases
@pytest.mark.asyncio
class TestUserAPI(AsyncAPITestCase):
    """Test user API endpoints"""

    async def test_create_user(self, async_client: httpx.AsyncClient):
        """Test user creation"""
        user_data = {
            "email": "test@example.com",
            "username": "testuser",
            "password": "securepassword123"
        }

        response = await async_client.post(
            "/api/users",
            json=user_data
        )

        assert response.status_code == 201
        data = response.json()
        assert data["email"] == user_data["email"]
        assert data["username"] == user_data["username"]
        assert "password" not in data

    async def test_get_user(self, async_client: httpx.AsyncClient):
        """Test get user"""
        # First create a user
        create_response = await async_client.post(
            "/api/users",
            json={
                "email": "test2@example.com",
                "username": "testuser2",
                "password": "securepassword123"
            }
        )
        user_id = create_response.json()["id"]

        # Get user
        response = await async_client.get(f"/api/users/{user_id}")
        assert response.status_code == 200
        data = response.json()
        assert data["id"] == user_id

    async def test_list_users_with_pagination(
        self,
        async_client: httpx.AsyncClient
    ):
        """Test user listing with pagination"""
        response = await async_client.get("/api/users?page=1&per_page=10")
        assert response.status_code == 200
        data = response.json()
        assert "data" in data
        assert "meta" in data
        assert "links" in data
        assert data["meta"]["page"] == 1
        assert data["meta"]["per_page"] == 10

    async def test_rate_limiting(
        self,
        async_client: httpx.AsyncClient
    ):
        """Test rate limiting"""
        # Make multiple requests quickly
        responses = []
        for _ in range(150):  # Assuming rate limit is 100/minute
            response = await async_client.get("/api/users")
            responses.append(response)

        # Check if rate limiting kicked in
        rate_limited = any(
            r.status_code == 429 for r in responses
        )
        assert rate_limited

# Integration tests
@pytest.mark.asyncio
async def test_full_user_flow():
    """Test complete user workflow"""
    async with httpx.AsyncClient(
        app=app,
        base_url="http://test"
    ) as client:

        # Create user
        user_data = {
            "email": "flowtest@example.com",
            "username": "flowtest",
            "password": "securepassword123"
        }
        create_resp = await client.post("/api/users", json=user_data)
        assert create_resp.status_code == 201
        user = create_resp.json()
        user_id = user["id"]

        # Get user
        get_resp = await client.get(f"/api/users/{user_id}")
        assert get_resp.status_code == 200
        assert get_resp.json()["id"] == user_id

        # Update user
        update_data = {"username": "updateduser"}
        update_resp = await client.patch(
            f"/api/users/{user_id}",
            json=update_data
        )
        assert update_resp.status_code == 200
        assert update_resp.json()["username"] == "updateduser"

        # Delete user
        delete_resp = await client.delete(f"/api/users/{user_id}")
        assert delete_resp.status_code == 204

        # Verify user is deleted
        get_resp = await client.get(f"/api/users/{user_id}")
        assert get_resp.status_code == 404

7. Performance Optimization Patterns

python
# patterns/performance.py
import asyncio
import asyncio.cache
import aioredis
from typing import Any, Optional, Callable
from functools import wraps
import hashlib
import json
import pickle

class ResponseCache:
    """Response caching with Redis"""

    def __init__(self, redis_url: str, default_ttl: int = 300):
        self.redis = None
        self.redis_url = redis_url
        self.default_ttl = default_ttl

    async def initialize(self):
        """Initialize Redis connection"""
        self.redis = await aioredis.from_url(self.redis_url)

    def _make_cache_key(self, prefix: str, *args, **kwargs) -> str:
        """Generate cache key from arguments"""
        key_data = {
            "args": args,
            "kwargs": kwargs
        }
        key_str = json.dumps(key_data, sort_keys=True, default=str)
        key_hash = hashlib.sha256(key_str.encode()).hexdigest()
        return f"{prefix}:{key_hash}"

    async def get(self, key: str) -> Optional[Any]:
        """Get cached value"""
        if not self.redis:
            await self.initialize()

        cached = await self.redis.get(key)
        if cached:
            return pickle.loads(cached)
        return None

    async def set(
        self,
        key: str,
        value: Any,
        ttl: Optional[int] = None
    ):
        """Set cached value"""
        if not self.redis:
            await self.initialize()

        ttl = ttl or self.default_ttl
        serialized = pickle.dumps(value)
        await self.redis.setex(key, ttl, serialized)

    async def delete(self, key: str):
        """Delete cached value"""
        if self.redis:
            await self.redis.delete(key)

def cache_response(
    prefix: str,
    ttl: Optional[int] = None,
    cache: Optional[ResponseCache] = None
):
    """Decorator to cache response"""
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        async def wrapper(*args, **kwargs):
            cache_instance = cache or ResponseCache("redis://localhost")

            # Generate cache key
            cache_key = cache_instance._make_cache_key(prefix, *args, **kwargs)

            # Try to get from cache
            cached_result = await cache_instance.get(cache_key)
            if cached_result is not None:
                return cached_result

            # Execute function
            result = await func(*args, **kwargs)

            # Cache result
            await cache_instance.set(cache_key, result, ttl)

            return result

        return wrapper
    return decorator

# Usage example
@cache_response(prefix="user_data", ttl=300)
async def get_user_data(user_id: int) -> Dict[str, Any]:
    """Get user data with caching"""
    # Expensive database operation or API call
    await asyncio.sleep(1)  # Simulate slow operation
    return {"id": user_id, "name": "John Doe", "email": "john@example.com"}

# Batch processing for performance
class BatchProcessor:
    """Batch processor for optimizing multiple operations"""

    def __init__(self, batch_size: int = 100, flush_interval: int = 5):
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        self.queue = asyncio.Queue()
        self.processing = False

    async def add(self, item: Any):
        """Add item to batch queue"""
        await self.queue.put(item)
        if not self.processing:
            self.processing = True
            asyncio.create_task(self._process_batch())

    async def _process_batch(self):
        """Process items in batches"""
        while True:
            batch = []

            # Collect batch items
            try:
                deadline = asyncio.get_event_loop().time() + self.flush_interval

                while len(batch) < self.batch_size:
                    try:
                        timeout = max(0, deadline - asyncio.get_event_loop().time())
                        item = await asyncio.wait_for(
                            self.queue.get(),
                            timeout=timeout
                        )
                        batch.append(item)
                    except asyncio.TimeoutError:
                        break

                if batch:
                    await self._process_batch_items(batch)

            except Exception as e:
                print(f"Error processing batch: {e}")
                continue

    async def _process_batch_items(self, batch: list[Any]):
        """Process a batch of items"""
        # Override in subclasses
        # Example: batch database insert, batch API calls, etc.
        print(f"Processing batch of {len(batch)} items")

8. Production Configuration Checklist

yaml
# api/production_checklist.yaml
performance:
  async_patterns:
    connection_pooling: true
    max_connections: 100
    connection_timeout: 30
    keep_alive: true

  caching:
    redis_cache: true
    default_ttl: 300
    cache_headers: true

  compression:
    gzip: true
    level: 6
    threshold: 1024

  rate_limiting:
    enabled: true
    default_limit: 100/minute
    burst_limit: 200
    sliding_window: true

security:
  authentication:
    jwt_validation: true
    token_refresh: true
    revoke_tokens: true

  authorization:
    rbac: true
    rate_limit_by_role: true

  validation:
    input_validation: true
    sql_injection_protection: true
    xss_protection: true

  headers:
    security_headers: true
    cors: true
    csrf_protection: true

monitoring:
  metrics:
    prometheus: true
    request_duration: true
    error_rate: true
    throughput: true

  logging:
    structured_logging: true
    correlation_ids: true
    error_tracking: true

  health_checks:
    liveness_probe: true
    readiness_probe: true
    dependency_checks: true

documentation:
  openapi:
    auto_generation: true
    examples: true
    schemas: true

  versioning:
    strategy: "header_path_query"
    deprecation_warnings: true
    backward_compatibility: true

  testing:
    unit_tests: true
    integration_tests: true
    performance_tests: true
    contract_tests: true

This comprehensive API development skill provides modern patterns for building high-performance APIs in 2025, including async/await patterns, circuit breakers, rate limiting, versioning strategies, comprehensive testing, and production optimization techniques that work across different frameworks.

Didn't find tool you were looking for?

Be as detailed as possible for better results