Agent skill
configuring-dapr-pubsub
Configures Dapr pub/sub components for event-driven microservices with Kafka or Redis. Use when wiring agent-to-agent communication, setting up event subscriptions, or integrating Dapr sidecars. Covers component configuration, subscription patterns, publishing events, and Kubernetes deployment. NOT when using direct Kafka clients or non-Dapr messaging patterns.
Install this agent skill to your Project
npx add-skill https://github.com/aiskillstore/marketplace/tree/main/skills/asmayaseen/configuring-dapr-pubsub
SKILL.md
Configuring Dapr Pub/Sub
Wire event-driven microservices using Dapr pub/sub with Kafka or Redis backends.
Quick Start
# components/pubsub.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers
value: "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"
- name: authType
value: "none"
- name: disableTls
value: "true"
# Apply component
kubectl apply -f components/pubsub.yaml
# Test with Dapr CLI
dapr run --app-id publisher -- dapr publish --pubsub pubsub --topic test --data '{"msg":"hello"}'
Component Configurations
Kafka (Production)
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub
spec:
type: pubsub.kafka
version: v1
metadata:
# Required
- name: brokers
value: "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"
- name: authType
value: "none"
# Consumer settings
- name: consumerGroup
value: "{namespace}-{appId}" # Templated per deployment
- name: consumeRetryInterval
value: "100ms"
- name: heartbeatInterval
value: "3s"
- name: sessionTimeout
value: "10s"
# Performance
- name: maxMessageBytes
value: "1048576" # 1MB
- name: channelBufferSize
value: "256"
Kafka with SASL Authentication
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub-secure
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers
value: "kafka.example.com:9093"
- name: authType
value: "password"
- name: saslUsername
value: "dapr-user"
- name: saslPassword
secretKeyRef:
name: kafka-secrets
key: password
- name: saslMechanism
value: "SCRAM-SHA-256"
Redis (Development/Simple)
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: redis-pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: "redis-master.redis.svc.cluster.local:6379"
- name: redisPassword
secretKeyRef:
name: redis-secrets
key: password
Subscription Patterns
Declarative Subscription (Recommended)
# subscriptions/task-events.yaml
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
name: task-created-subscription
spec:
pubsubname: pubsub
topic: task-created
routes:
default: /dapr/task-created
scopes:
- triage-agent
- concepts-agent
Programmatic Subscription (FastAPI)
from fastapi import FastAPI, Request
app = FastAPI()
@app.get("/dapr/subscribe")
async def subscribe():
"""Dapr calls this to discover subscriptions."""
return [
{
"pubsubname": "pubsub",
"topic": "task-created",
"route": "/dapr/task-created"
},
{
"pubsubname": "pubsub",
"topic": "task-completed",
"route": "/dapr/task-completed"
}
]
@app.post("/dapr/task-created")
async def handle_task_created(request: Request):
"""Handle incoming CloudEvent."""
event = await request.json()
# CloudEvent wrapper - data is nested
task_data = event.get("data", event)
task_id = task_data.get("task_id")
# Process event
print(f"Task created: {task_id}")
return {"status": "SUCCESS"}
Publishing Events
From FastAPI Service
import httpx
DAPR_URL = "http://localhost:3500"
async def publish_event(topic: str, data: dict):
"""Publish event through Dapr sidecar."""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{DAPR_URL}/v1.0/publish/pubsub/{topic}",
json=data,
headers={"Content-Type": "application/json"}
)
response.raise_for_status()
# Usage
await publish_event("task-created", {
"task_id": "123",
"title": "Learn Python",
"user_id": "user-456"
})
With CloudEvent Metadata
async def publish_cloudevent(topic: str, data: dict, event_type: str):
"""Publish with explicit CloudEvent fields."""
async with httpx.AsyncClient() as client:
await client.post(
f"{DAPR_URL}/v1.0/publish/pubsub/{topic}",
json=data,
headers={
"Content-Type": "application/cloudevents+json",
"ce-specversion": "1.0",
"ce-type": event_type,
"ce-source": "triage-agent",
"ce-id": str(uuid.uuid4())
}
)
Kubernetes Deployment
Component Scoping
Limit component access to specific apps:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers
value: "kafka:9092"
scopes:
- triage-agent
- concepts-agent
- debug-agent
App Deployment with Dapr Sidecar
apiVersion: apps/v1
kind: Deployment
metadata:
name: triage-agent
spec:
replicas: 2
selector:
matchLabels:
app: triage-agent
template:
metadata:
labels:
app: triage-agent
annotations:
dapr.io/enabled: "true"
dapr.io/app-id: "triage-agent"
dapr.io/app-port: "8000"
dapr.io/enable-api-logging: "true"
spec:
containers:
- name: triage-agent
image: myapp/triage-agent:latest
ports:
- containerPort: 8000
env:
- name: DAPR_HTTP_PORT
value: "3500"
Multi-Agent Routing Pattern
Triage Agent → Specialist Agents
# triage_agent.py
from fastapi import FastAPI, Request
import httpx
app = FastAPI()
DAPR_URL = "http://localhost:3500"
@app.post("/api/question")
async def handle_question(request: Request):
data = await request.json()
question = data["question"]
# Route based on content
if "python" in question.lower() or "code" in question.lower():
topic = "concepts-request"
elif "error" in question.lower() or "bug" in question.lower():
topic = "debug-request"
else:
topic = "concepts-request" # Default
# Publish to appropriate agent
async with httpx.AsyncClient() as client:
await client.post(
f"{DAPR_URL}/v1.0/publish/pubsub/{topic}",
json={
"question": question,
"user_id": data["user_id"],
"session_id": data["session_id"]
}
)
return {"status": "routed", "topic": topic}
Specialist Agent Handler
# concepts_agent.py
from fastapi import FastAPI, Request
import httpx
app = FastAPI()
DAPR_URL = "http://localhost:3500"
@app.get("/dapr/subscribe")
async def subscribe():
return [{"pubsubname": "pubsub", "topic": "concepts-request", "route": "/dapr/handle"}]
@app.post("/dapr/handle")
async def handle_concepts_request(request: Request):
event = await request.json()
data = event.get("data", event)
# Process with LLM
response = await process_with_llm(data["question"])
# Publish response
async with httpx.AsyncClient() as client:
await client.post(
f"{DAPR_URL}/v1.0/publish/pubsub/response-ready",
json={
"session_id": data["session_id"],
"response": response,
"agent": "concepts"
}
)
return {"status": "SUCCESS"}
Local Development
Run with Dapr CLI
# Start subscriber first
dapr run --app-id concepts-agent --app-port 8001 --dapr-http-port 3501 \
--resources-path ./components -- uvicorn concepts:app --port 8001
# Start publisher
dapr run --app-id triage-agent --app-port 8000 --dapr-http-port 3500 \
--resources-path ./components -- uvicorn triage:app --port 8000
Docker Compose with Dapr
version: "3.8"
services:
triage-agent:
build: ./services/triage
ports:
- "8000:8000"
triage-agent-dapr:
image: daprio/daprd:latest
command: ["./daprd",
"--app-id", "triage-agent",
"--app-port", "8000",
"--dapr-http-port", "3500",
"--resources-path", "/components"
]
volumes:
- ./components:/components
network_mode: "service:triage-agent"
depends_on:
- triage-agent
kafka:
image: confluentinc/cp-kafka:latest
# ... kafka config
Troubleshooting
Check Dapr Sidecar
# View sidecar logs
kubectl logs deploy/triage-agent -c daprd
# Check component registration
curl http://localhost:3500/v1.0/metadata
Common Issues
| Error | Cause | Fix |
|---|---|---|
component not found |
Component not loaded | Check --resources-path or K8s namespace |
connection refused |
Kafka not reachable | Verify broker address in component |
consumer group rebalance |
Multiple instances | Use unique consumerGroup per app |
event not received |
Wrong topic/route | Check subscription config |
Debug Event Flow
# Publish test event
dapr publish --pubsub pubsub --topic test --data '{"test": true}'
# Check consumer logs
kubectl logs deploy/my-app -c daprd | grep -i subscribe
Verification
Run: python scripts/verify.py
Related Skills
deploying-kafka-k8s- Kafka cluster setup with Strimziscaffolding-fastapi-dapr- FastAPI services with Daprscaffolding-openai-agents- Agent orchestration patterns
Recommended Agent Skills
Expand your agent's capabilities with these related and highly-rated skills.
perigon-backend
Perigon ASP.NET Core + EF Core + Aspire conventions
perigon-agent
Pointers for Copilot/agents to apply Perigon conventions
perigon-angular
Angular 21+ standalone/Material/signal conventions for Perigon WebApp
fastapi-mastery
Comprehensive FastAPI development skill covering REST API creation, routing, request/response handling, validation, authentication, database integration, middleware, and deployment. Use when working with FastAPI projects, building APIs, implementing CRUD operations, setting up authentication/authorization, integrating databases (SQL/NoSQL), adding middleware, handling WebSockets, or deploying FastAPI applications. Triggered by requests involving .py files with FastAPI code, API endpoint creation, Pydantic models, or FastAPI-specific features.
context7-efficient
Token-efficient library documentation fetcher using Context7 MCP with 86.8% token savings through intelligent shell pipeline filtering. Fetches code examples, API references, and best practices for JavaScript, Python, Go, Rust, and other libraries. Use when users ask about library documentation, need code examples, want API usage patterns, are learning a new framework, need syntax reference, or troubleshooting with library-specific information. Triggers include questions like "Show me React hooks", "How do I use Prisma", "What's the Next.js routing syntax", or any request for library/framework documentation.
browser-use
Browser automation using Playwright MCP. Navigate websites, fill forms, click elements, take screenshots, and extract data. Use when tasks require web browsing, form submission, web scraping, UI testing, or any browser interaction.
Didn't find tool you were looking for?