Agent skill

building-with-kafka-strimzi

Use when building event-driven systems with Apache Kafka on Kubernetes. Triggers include EDA patterns, Kafka producers/consumers, Strimzi operator deployment, Schema Registry, transactions, exactly-once semantics. NOT for general messaging (use Dapr pub/sub for abstraction).

Stars 163
Forks 31

Install this agent skill to your Project

npx add-skill https://github.com/majiayu000/claude-skill-registry/tree/main/skills/data/building-with-kafka-strimzi

SKILL.md

Building Event-Driven Systems with Kafka & Strimzi

Production-ready event streaming on Kubernetes using Apache Kafka with Strimzi operator.

Persona

You are a Kafka and event-driven architecture expert with production Kubernetes experience. You understand:

  • Event-driven architecture patterns (events vs commands, eventual consistency)
  • Apache Kafka internals (brokers, partitions, consumer groups, offsets)
  • Strimzi operator for Kubernetes-native Kafka deployment
  • confluent-kafka-python for high-performance Python clients
  • Schema Registry and Avro for event schema management
  • Exactly-once semantics and transactional patterns

When to Use

  • Building event-driven microservices
  • Deploying Kafka on Kubernetes with Strimzi
  • Implementing reliable producers with delivery guarantees
  • Managing consumer groups and offset handling
  • Schema evolution with Avro and Schema Registry
  • Change data capture with Debezium
  • Transactional event processing

Core Concepts

Event-Driven Architecture

Concept Description
Events Immutable facts about past occurrences (e.g., "OrderCreated")
Commands Requests to perform actions (e.g., "CreateOrder")
Eventual Consistency Systems converge to consistent state over time
Event Sourcing Capture state changes as event sequence
CQRS Separate command and query processing

Kafka Architecture

┌─────────────────────────────────────────────────────────────┐
│  Kafka Cluster (KRaft Mode - No ZooKeeper)                  │
├─────────────────────────────────────────────────────────────┤
│  Controller Nodes (metadata via Raft)                       │
│   └─ __cluster_metadata topic                               │
├─────────────────────────────────────────────────────────────┤
│  Broker Nodes (message processing)                          │
│   └─ Topics → Partitions → Segments                         │
├─────────────────────────────────────────────────────────────┤
│  Producers → Topics ← Consumer Groups                       │
│   └─ Partition assignment, offset tracking                  │
└─────────────────────────────────────────────────────────────┘

Strimzi Components

Component Role
Cluster Operator Manages Kafka cluster lifecycle
Entity Operator Contains Topic + User operators
Topic Operator Manages KafkaTopic CRs
User Operator Manages KafkaUser CRs and credentials

Decision Logic

Situation Pattern Why
Critical data acks=all + idempotent producer Durability over speed
High throughput acks=1 + batching Balance speed/safety
Atomic multi-topic writes Transactions All-or-nothing
Schema evolution Avro + Schema Registry Backward compatibility
Database sync Debezium CDC + Outbox Transactional integrity
Consumer scaling Consumer groups Parallel processing

Strimzi Deployment on Kubernetes

Install Strimzi Operator

bash
# Add Strimzi Helm repo
helm repo add strimzi https://strimzi.io/charts/
helm repo update

# Install operator
helm install strimzi-kafka-operator strimzi/strimzi-kafka-operator \
  --namespace kafka --create-namespace

Create Kafka Cluster (KRaft Mode)

yaml
# kafka-cluster.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: dual-role
  labels:
    strimzi.io/cluster: task-events
spec:
  replicas: 1
  roles:
    - controller
    - broker
  storage:
    type: ephemeral  # Use persistent-claim for production
---
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: task-events
  annotations:
    strimzi.io/node-pools: enabled
    strimzi.io/kraft: enabled
spec:
  kafka:
    version: 3.8.0
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      default.replication.factor: 1
      min.insync.replicas: 1
  entityOperator:
    topicOperator: {}
    userOperator: {}
bash
kubectl apply -f kafka-cluster.yaml -n kafka

Create Topics via CRD

yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: task-created
  labels:
    strimzi.io/cluster: task-events
spec:
  partitions: 3
  replicas: 1
  config:
    retention.ms: "604800000"  # 7 days
    cleanup.policy: delete

Create Users via CRD

yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: task-api
  labels:
    strimzi.io/cluster: task-events
spec:
  authentication:
    type: scram-sha-512
  authorization:
    type: simple
    acls:
      - resource:
          type: topic
          name: task-*
          patternType: prefix
        operations: [Read, Write, Describe]

Python Producer Patterns

Basic Producer with Delivery Reports

python
from confluent_kafka import Producer

def delivery_report(err, msg):
    """Callback triggered by poll() or flush()"""
    if err is not None:
        print(f'Delivery failed: {err}')
    else:
        print(f'Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}')

producer = Producer({
    'bootstrap.servers': 'task-events-kafka-bootstrap:9092',
    'client.id': 'task-api',
    'acks': 'all',
    'enable.idempotence': True,
    'retries': 5,
    'delivery.timeout.ms': 30000
})

# Async produce (default)
producer.produce(
    topic='task-created',
    key='task-123',
    value='{"id": "task-123", "title": "Buy groceries"}',
    callback=delivery_report
)

# Service callbacks
producer.poll(0)

# Flush before shutdown
producer.flush()

Idempotent Producer (Exactly-Once)

python
producer = Producer({
    'bootstrap.servers': 'localhost:9092',
    'enable.idempotence': True,  # Prevents duplicates on retry
    'acks': 'all',               # Wait for all replicas
    'max.in.flight.requests.per.connection': 5,  # Max for idempotence
    'retries': 2147483647        # Retry indefinitely within timeout
})

Python Consumer Patterns

Basic Consumer with Manual Commit

python
from confluent_kafka import Consumer, KafkaError

consumer = Consumer({
    'bootstrap.servers': 'task-events-kafka-bootstrap:9092',
    'group.id': 'notification-service',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False  # Manual commit for at-least-once
})

consumer.subscribe(['task-created'])

try:
    while True:
        msg = consumer.poll(1.0)

        if msg is None:
            continue

        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            raise Exception(msg.error())

        # Process message
        print(f'Received: {msg.value().decode()}')

        # Commit after successful processing
        consumer.commit(message=msg)

finally:
    consumer.close()

Consumer with Rebalance Callbacks

python
def on_assign(consumer, partitions):
    print(f'Assigned: {partitions}')

def on_revoke(consumer, partitions):
    print(f'Revoking: {partitions}')
    consumer.commit(asynchronous=False)  # Commit before losing partitions

consumer.subscribe(
    ['task-created'],
    on_assign=on_assign,
    on_revoke=on_revoke
)

Avro with Schema Registry

Producer with Avro Serialization

python
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import SerializationContext, MessageField

# Schema Registry client
sr_client = SchemaRegistryClient({'url': 'http://schema-registry:8081'})

# Avro schema
task_schema = """
{
  "type": "record",
  "name": "TaskCreated",
  "namespace": "com.example.events",
  "fields": [
    {"name": "id", "type": "string"},
    {"name": "title", "type": "string"},
    {"name": "created_at", "type": "string"},
    {"name": "priority", "type": ["null", "int"], "default": null}
  ]
}
"""

# Serializer
serializer = AvroSerializer(
    schema_registry_client=sr_client,
    schema_str=task_schema,
    to_dict=lambda obj, ctx: obj.__dict__
)

# Produce
class TaskCreated:
    def __init__(self, id, title, created_at, priority=None):
        self.id = id
        self.title = title
        self.created_at = created_at
        self.priority = priority

event = TaskCreated('task-123', 'Buy groceries', '2025-01-01T10:00:00Z', 1)
producer.produce(
    topic='task-created',
    key='task-123',
    value=serializer(event, SerializationContext('task-created', MessageField.VALUE))
)

Transactions (Exactly-Once)

Transactional Producer

python
producer = Producer({
    'bootstrap.servers': 'localhost:9092',
    'transactional.id': 'task-processor-1',
    'enable.idempotence': True
})

# Initialize transactions once
producer.init_transactions()

try:
    producer.begin_transaction()

    # Produce multiple messages atomically
    producer.produce('orders', key='o1', value='order-1')
    producer.produce('payments', key='p1', value='payment-1')
    producer.produce('audit', key='a1', value='audit-log')

    producer.commit_transaction()
except Exception as e:
    producer.abort_transaction()
    raise

Transaction-Aware Consumer

python
consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'txn-consumer',
    'isolation.level': 'read_committed',  # Only read committed messages
    'enable.auto.commit': False
})

FastAPI Integration

Async Producer with Lifespan

python
from contextlib import asynccontextmanager
from fastapi import FastAPI
from confluent_kafka import Producer
import asyncio

producer = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global producer
    producer = Producer({'bootstrap.servers': 'kafka:9092'})
    yield
    producer.flush()

app = FastAPI(lifespan=lifespan)

@app.post("/tasks")
async def create_task(title: str):
    task_id = str(uuid.uuid4())
    event = {"id": task_id, "title": title}

    producer.produce(
        'task-created',
        key=task_id,
        value=json.dumps(event)
    )
    producer.poll(0)

    return {"id": task_id}

Background Consumer

python
import asyncio
from threading import Thread

def consume_loop():
    consumer = Consumer({
        'bootstrap.servers': 'kafka:9092',
        'group.id': 'notification-service'
    })
    consumer.subscribe(['task-created'])

    while True:
        msg = consumer.poll(1.0)
        if msg and not msg.error():
            # Process message
            pass

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Start consumer in background thread
    consumer_thread = Thread(target=consume_loop, daemon=True)
    consumer_thread.start()
    yield

Delivery Guarantees

Guarantee Producer Config Consumer Config Risk
At-most-once acks=0 Auto-commit before process Data loss
At-least-once acks=all, retries Commit after process Duplicates
Exactly-once Transactions + idempotence isolation.level=read_committed Complexity

Task API Event Examples

Event Naming Convention

Domain: task
Events: task.created, task.updated, task.completed, task.deleted
Topics: task-events (single topic) or task-created, task-updated (per event)

Event Schema

json
{
  "event_id": "uuid",
  "event_type": "task.created",
  "occurred_at": "ISO-8601",
  "data": {
    "task_id": "uuid",
    "title": "string",
    "owner_id": "uuid"
  },
  "metadata": {
    "correlation_id": "uuid",
    "causation_id": "uuid"
  }
}

Safety & Guardrails

NEVER

  • Use acks=0 for critical data
  • Set max.in.flight.requests > 5 with idempotence
  • Skip consumer.close() (causes rebalance delays)
  • Store offsets before successful processing
  • Expose broker addresses externally without TLS

ALWAYS

  • Use acks=all for important events
  • Enable idempotence for exactly-once
  • Handle KafkaError._PARTITION_EOF gracefully
  • Use Schema Registry for production
  • Set appropriate retention.ms for event topics
  • Monitor consumer lag

Common Errors

Error Cause Fix
NOT_ENOUGH_REPLICAS ISR below min.insync.replicas Check broker health
COORDINATOR_NOT_AVAILABLE Consumer group coordinator missing Wait, retry
REBALANCE_IN_PROGRESS Consumer group rebalancing Wait for completion
OFFSET_OUT_OF_RANGE Requested offset doesn't exist Check auto.offset.reset
UNKNOWN_TOPIC_OR_PARTITION Topic doesn't exist Create topic first

References

Expand your agent's capabilities with these related and highly-rated skills.

Didn't find tool you were looking for?

Be as detailed as possible for better results