Agent skill
Workflows Patterns
Use when asking about "Workflows", "durable workflows", "multi-step processing", "WorkflowEntrypoint", "WorkflowStep", "data pipelines", "background jobs", "ingestion pipeline", or orchestrating multiple operations that need durability and error recovery.
Install this agent skill to your Project
npx add-skill https://github.com/majiayu000/claude-skill-registry/tree/main/skills/data/workflows-patterns
SKILL.md
Cloudflare Workflows
Purpose
This skill provides guidance on Cloudflare Workflows, a durable execution engine for multi-step background processing. Use Workflows when you need guaranteed completion of complex operations that span multiple bindings (D1, R2, Vectorize, AI) with automatic retry and state persistence.
When to Use Workflows
Good use cases:
- Data ingestion pipelines (fetch → store → process → index)
- Document processing (upload → chunk → embed → vectorize)
- Multi-step AI operations (generate → validate → store)
- Background jobs that must complete even if Workers restart
- Operations that need transaction-like guarantees
Not needed for:
- Simple request/response handlers
- Single-step operations
- Real-time operations where latency matters
- Operations that can safely fail silently
Core Concepts
WorkflowEntrypoint
The main class that defines your workflow:
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';
interface Env {
DATABASE: D1Database;
STORAGE: R2Bucket;
AI: Ai;
VECTOR_INDEX: VectorizeIndex;
}
interface WorkflowParams {
documentId: string;
content: string;
}
export class MyWorkflow extends WorkflowEntrypoint<Env, WorkflowParams> {
async run(event: WorkflowEvent<WorkflowParams>, step: WorkflowStep) {
const { documentId, content } = event.payload;
// Steps go here...
}
}
WorkflowStep
Each step.do() call is a durable checkpoint:
// Step 1: Store in R2
const storedPath = await step.do('store-document', async () => {
const path = `documents/${documentId}.txt`;
await this.env.STORAGE.put(path, content);
return path;
});
// Step 2: Create D1 record
const recordId = await step.do('create-record', async () => {
const result = await this.env.DATABASE
.prepare('INSERT INTO documents (id, path) VALUES (?, ?)')
.bind(documentId, storedPath)
.run();
return documentId;
});
Key properties:
- Steps are named with unique strings
- Step results are persisted automatically
- If a step fails, workflow pauses and can retry
- Completed steps are not re-executed on retry
Data Ingestion Pipeline Pattern
Complete example based on real-world usage:
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';
interface Env {
DATABASE: D1Database;
ARTICLES_BUCKET: R2Bucket;
AI: Ai;
VECTOR_INDEX: VectorizeIndex;
DEFAULT_CHUNK_SIZE: number;
DEFAULT_CHUNK_OVERLAP: number;
}
interface IngestionParams {
articleId: string;
title: string;
content: string;
}
export class IngestionWorkflow extends WorkflowEntrypoint<Env, IngestionParams> {
async run(event: WorkflowEvent<IngestionParams>, step: WorkflowStep) {
const { articleId, title, content } = event.payload;
try {
// Step 1: Store raw article in R2
await step.do('store-article', async () => {
await this.env.ARTICLES_BUCKET.put(
`articles/${articleId}.json`,
JSON.stringify({ id: articleId, title, content }),
{ httpMetadata: { contentType: 'application/json' } }
);
return { success: true };
});
// Step 2: Create document record in D1
const documentId = await step.do('create-document', async () => {
const id = crypto.randomUUID();
await this.env.DATABASE
.prepare('INSERT INTO documents (id, article_id, title) VALUES (?, ?, ?)')
.bind(id, articleId, title)
.run();
return id;
});
// Step 3: Split into chunks
const chunks = await step.do('split-text', async () => {
return this.splitIntoChunks(content, title);
});
// Step 4: Store chunks in D1
const chunkRecords = await step.do('store-chunks', async () => {
const records = [];
for (const [index, text] of chunks.entries()) {
const chunkId = crypto.randomUUID();
await this.env.DATABASE
.prepare('INSERT INTO chunks (id, document_id, text, chunk_index) VALUES (?, ?, ?, ?)')
.bind(chunkId, documentId, text, index)
.run();
records.push({ id: chunkId, text, index });
}
return records;
});
// Step 5: Generate embeddings
const embeddings = await step.do('generate-embeddings', async () => {
const texts = chunkRecords.map(c => c.text);
const result = await this.env.AI.run('@cf/baai/bge-base-en-v1.5', {
text: texts
}) as { data: number[][] };
return result.data;
});
// Step 6: Insert vectors into Vectorize
await step.do('insert-vectors', async () => {
const vectors = chunkRecords.map((chunk, idx) => ({
id: chunk.id,
values: embeddings[idx],
metadata: { documentId, chunkId: chunk.id, title }
}));
await this.env.VECTOR_INDEX.upsert(vectors);
return { count: vectors.length };
});
return {
success: true,
documentId,
chunksCreated: chunkRecords.length,
vectorsInserted: embeddings.length
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error'
};
}
}
private splitIntoChunks(content: string, title: string): string[] {
const chunkSize = this.env.DEFAULT_CHUNK_SIZE || 500;
const overlap = this.env.DEFAULT_CHUNK_OVERLAP || 100;
const chunks: string[] = [];
let start = 0;
while (start < content.length) {
const end = Math.min(start + chunkSize, content.length);
chunks.push(content.slice(start, end));
start = end - overlap;
if (start >= content.length) break;
}
return chunks;
}
}
Wrangler Configuration
// wrangler.jsonc
{
"name": "my-worker",
"main": "src/index.ts",
"compatibility_date": "2024-01-15",
"workflows": [
{
"name": "ingestion-workflow",
"binding": "INGESTION_WORKFLOW",
"class_name": "IngestionWorkflow"
}
],
"d1_databases": [
{ "binding": "DATABASE", "database_name": "my-db", "database_id": "..." }
],
"r2_buckets": [
{ "binding": "ARTICLES_BUCKET", "bucket_name": "articles" }
],
"vectorize": [
{ "binding": "VECTOR_INDEX", "index_name": "embeddings" }
],
"ai": { "binding": "AI" }
}
Triggering Workflows
From a Worker
export default {
async fetch(request: Request, env: Env) {
const { articleId, title, content } = await request.json();
// Create a workflow instance
const instance = await env.INGESTION_WORKFLOW.create({
id: `ingest-${articleId}`,
params: { articleId, title, content }
});
return Response.json({
workflowId: instance.id,
status: 'started'
});
}
};
Checking Workflow Status
// Get workflow status
const status = await env.INGESTION_WORKFLOW.get(workflowId);
console.log({
status: status.status, // 'running', 'complete', 'error'
output: status.output // Result from run() if complete
});
Error Handling
Step-Level Retry
Steps automatically retry on failure. Control retry behavior:
await step.do('risky-operation', {
retries: { limit: 3, delay: '10 seconds', backoff: 'exponential' }
}, async () => {
// This will retry up to 3 times with exponential backoff
return await riskyOperation();
});
Workflow-Level Error Handling
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
try {
// All steps...
return { success: true, result: '...' };
} catch (error) {
// Log error for debugging
console.error('Workflow failed:', error);
// Return failure result
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error'
};
}
}
Best Practices
Step Naming
Use descriptive, unique step names:
store-article(good)step-1(bad - not descriptive)store-${articleId}(bad - step names should be static)
Step Granularity
- Make steps atomic (do one thing)
- Don't put retryable and non-retryable operations in same step
- Keep steps focused for better retry behavior
Idempotency
Steps should be idempotent (safe to retry):
// Good: Use upsert instead of insert
await this.env.VECTOR_INDEX.upsert(vectors);
// Bad: Insert might fail on retry
await this.env.VECTOR_INDEX.insert(vectors);
Batching
Batch operations within steps for efficiency:
// Good: Batch embedding generation
const embeddings = await step.do('generate-embeddings', async () => {
const batchSize = 10;
const allEmbeddings = [];
for (let i = 0; i < texts.length; i += batchSize) {
const batch = texts.slice(i, i + batchSize);
const result = await this.env.AI.run(model, { text: batch });
allEmbeddings.push(...result.data);
}
return allEmbeddings;
});
Common Patterns
Conditional Steps
const needsProcessing = await step.do('check-exists', async () => {
const existing = await this.env.DATABASE
.prepare('SELECT id FROM documents WHERE article_id = ?')
.bind(articleId)
.first();
return !existing;
});
if (needsProcessing) {
await step.do('process-new', async () => {
// Only runs for new documents
});
}
Parallel Steps
// Steps must be sequential, but you can parallelize within a step
await step.do('parallel-operations', async () => {
const [result1, result2] = await Promise.all([
this.env.DATABASE.prepare('...').run(),
this.env.STORAGE.put('...', data)
]);
return { result1, result2 };
});
Cleanup on Failure
try {
// ... workflow steps ...
} catch (error) {
// Cleanup step runs even on failure
await step.do('cleanup', async () => {
await this.env.STORAGE.delete(`temp/${workflowId}`);
});
throw error; // Re-throw to mark workflow as failed
}
Monitoring
Check workflow status via the Cloudflare dashboard or API:
- Workflow runs in progress
- Completed workflows
- Failed workflows with error details
- Step-by-step execution logs
Additional Resources
- Cloudflare Workflows documentation: https://developers.cloudflare.com/workflows/
- Use the cloudflare-docs-specialist agent to search for latest Workflows docs
- Reference the workers-development skill for general Workers patterns
Recommended Agent Skills
Expand your agent's capabilities with these related and highly-rated skills.
agent-ops-spec
Manage specification documents in .agent/specs/. Use when user provides requirements, acceptance criteria, or feature descriptions that need to be tracked and validated against implementation.
agent-ops-state
Maintain .agent state files. Use at session start, after meaningful steps, and before concluding: read/update constitution/memory/focus/issues/baseline consistently.
agent-ops-spec
Manage specification documents in .agent/specs/. Use when user provides requirements, acceptance criteria, or feature descriptions that need to be tracked and validated against implementation.
agent-ops-testing
Test strategy, execution, and coverage analysis. Use when designing tests, running test suites, or analyzing test results beyond baseline checks.
agent-ops-testing
Test strategy, execution, and coverage analysis. Use when designing tests, running test suites, or analyzing test results beyond baseline checks.
agent-ops-state
Maintain .agent state files. Use at session start, after meaningful steps, and before concluding: read/update constitution/memory/focus/issues/baseline consistently.
Didn't find tool you were looking for?