Agent skill
fal-optimization
Install this agent skill to your Project
npx add-skill https://github.com/JosiahSiegel/claude-plugin-marketplace/tree/main/plugins/fal-ai-master/skills/fal-optimization
SKILL.md
Quick Reference
| Optimization | Technique | Impact |
|---|---|---|
| Parallel requests | Promise.all() with batches |
5-10x throughput |
| Avoid polling | Use webhooks | Lower API calls |
| Cache by seed | Store prompt+seed results |
Avoid regeneration |
| Right-size images | Use needed resolution | Lower cost |
| Fewer steps | Reduce inference steps | Faster, cheaper |
| Model Tier | Development | Production |
|---|---|---|
| Image | FLUX Schnell | FLUX.2 Pro |
| Video | Runway Turbo | Kling 2.6 Pro |
| Serverless Config | Cost-Optimized | Latency-Optimized |
|---|---|---|
min_concurrency |
0 |
1+ |
keep_alive |
120 |
600+ |
machine_type |
Smallest viable | Higher tier |
When to Use This Skill
Use for performance and cost optimization:
- Reducing generation latency
- Lowering API costs
- Implementing parallel processing
- Choosing between polling and webhooks
- Configuring serverless scaling
Related skills:
- For API patterns: see
fal-api-reference - For model selection: see
fal-model-guide - For serverless config: see
fal-serverless-guide
fal.ai Performance and Cost Optimization
Strategies for optimizing performance, reducing costs, and scaling fal.ai integrations.
Performance Optimization
Client-Side Optimizations
1. Use Queue-Based Execution
Always prefer subscribe() over run() for generation tasks:
// Recommended: Queue-based with progress tracking
const result = await fal.subscribe("fal-ai/flux/dev", {
input: { prompt: "test" },
logs: true,
onQueueUpdate: (update) => {
// Show progress to users
if (update.status === "IN_PROGRESS") {
console.log("Generating...");
}
}
});
// Only use run() for fast endpoints (< 30s)
const quickResult = await fal.run("fal-ai/fast-sdxl", {
input: { prompt: "quick test" }
});
2. Parallel Requests
Process multiple requests concurrently:
// JavaScript - Parallel execution
async function generateBatch(prompts: string[]) {
const results = await Promise.all(
prompts.map(prompt =>
fal.subscribe("fal-ai/flux/dev", {
input: { prompt }
})
)
);
return results;
}
// With rate limiting
async function generateBatchWithLimit(prompts: string[], limit = 5) {
const results = [];
for (let i = 0; i < prompts.length; i += limit) {
const batch = prompts.slice(i, i + limit);
const batchResults = await Promise.all(
batch.map(prompt =>
fal.subscribe("fal-ai/flux/dev", { input: { prompt } })
)
);
results.push(...batchResults);
// Small delay between batches
if (i + limit < prompts.length) {
await new Promise(r => setTimeout(r, 100));
}
}
return results;
}
# Python - Async parallel
import asyncio
import fal_client
async def generate_batch(prompts: list[str]) -> list[dict]:
tasks = [
fal_client.run_async("fal-ai/flux/dev", arguments={"prompt": p})
for p in prompts
]
return await asyncio.gather(*tasks)
# With semaphore for rate limiting
async def generate_batch_limited(prompts: list[str], limit: int = 5):
semaphore = asyncio.Semaphore(limit)
async def generate_one(prompt: str):
async with semaphore:
return await fal_client.run_async(
"fal-ai/flux/dev",
arguments={"prompt": prompt}
)
return await asyncio.gather(*[generate_one(p) for p in prompts])
3. Streaming for Real-Time Feedback
Use streaming for progressive output:
// Show incremental progress
const stream = await fal.stream("fal-ai/flux/dev", {
input: { prompt: "A landscape" }
});
for await (const event of stream) {
updateProgressUI(event);
}
const result = await stream.done();
4. WebSockets for Interactive Apps
For real-time applications with continuous input:
const connection = fal.realtime.connect("fal-ai/lcm-sd15-i2i", {
connectionKey: `user-${userId}`,
throttleInterval: 128, // Debounce rapid inputs
onResult: (result) => {
displayImage(result.images[0].url);
}
});
// Send updates as user types/draws
inputElement.addEventListener('input', (e) => {
connection.send({
prompt: e.target.value,
image_url: currentImage
});
});
Server-Side Optimizations (Serverless)
1. Efficient Model Loading
class OptimizedApp(fal.App):
machine_type = "GPU-A100"
requirements = ["torch", "transformers", "accelerate"]
volumes = {
"/data": fal.Volume("model-cache")
}
def setup(self):
import torch
from transformers import AutoModelForCausalLM
# Use fp16 for faster inference and less memory
self.model = AutoModelForCausalLM.from_pretrained(
"model-name",
torch_dtype=torch.float16,
device_map="auto",
cache_dir="/data/models" # Persistent cache
)
# Enable optimizations
if hasattr(self.model, 'enable_attention_slicing'):
self.model.enable_attention_slicing()
2. Reduce Cold Starts
class WarmApp(fal.App):
machine_type = "GPU-A100"
keep_alive = 600 # 10 minutes warm
min_concurrency = 1 # Always keep one ready
# Use lightweight health check
@fal.endpoint("/health")
def health(self):
return {"status": "ok"}
3. Memory Management
class MemoryEfficientApp(fal.App):
def setup(self):
import torch
# Use mixed precision
self.model = load_model(torch_dtype=torch.float16)
# Enable memory-efficient attention (if using transformers)
self.model.enable_xformers_memory_efficient_attention()
def teardown(self):
# Clean up GPU memory
import torch
if hasattr(self, 'model'):
del self.model
torch.cuda.empty_cache()
@fal.endpoint("/generate")
def generate(self, request):
import torch
with torch.inference_mode(): # Disable gradient tracking
result = self.model(request.input)
return result
Cost Optimization
1. Choose the Right Model
| Need | Cheaper Option | Premium Option |
|---|---|---|
| Quick iteration | FLUX Schnell ($) | FLUX.1 Dev ($$) |
| Production | FLUX.1 Dev ($$) | FLUX.2 Pro ($$$) |
| Video preview | Runway Turbo ($$) | Kling Pro ($$$) |
// Development: Use fast/cheap models
const preview = await fal.subscribe("fal-ai/flux/schnell", {
input: { prompt: "test", num_inference_steps: 4 }
});
// Production: Use quality models
const final = await fal.subscribe("fal-ai/flux-2-pro", {
input: { prompt: "test" }
});
2. Optimize Image Sizes
Generate at the size you need, not larger:
// Don't generate larger than needed
const result = await fal.subscribe("fal-ai/flux/dev", {
input: {
prompt: "test",
// Use preset sizes
image_size: "square_hd", // 1024x1024
// Or specific dimensions
image_size: { width: 800, height: 600 }
}
});
3. Reduce Inference Steps
Find the minimum steps for acceptable quality:
// Quick previews: fewer steps
const preview = await fal.subscribe("fal-ai/flux/dev", {
input: {
prompt: "test",
num_inference_steps: 15 // Faster, slightly lower quality
}
});
// Final render: more steps
const final = await fal.subscribe("fal-ai/flux/dev", {
input: {
prompt: "test",
num_inference_steps: 28 // Default, high quality
}
});
4. Use Webhooks for High Volume
Avoid polling overhead with webhooks:
// Instead of polling
const result = await fal.subscribe("fal-ai/flux/dev", {
input: { prompt: "test" },
pollInterval: 1000 // Polling = more API calls
});
// Use webhooks
const { request_id } = await fal.queue.submit("fal-ai/flux/dev", {
input: { prompt: "test" },
webhookUrl: "https://your-server.com/webhook"
});
// No polling needed - result delivered to webhook
5. Cache Results
Use seeds for reproducible outputs:
// Cache key based on prompt + seed
const cacheKey = `${prompt}-${seed}`;
const cached = await cache.get(cacheKey);
if (cached) {
return cached;
}
const result = await fal.subscribe("fal-ai/flux/dev", {
input: { prompt, seed }
});
await cache.set(cacheKey, result);
return result;
6. Serverless Cost Optimization
class CostOptimizedApp(fal.App):
machine_type = "GPU-A10G" # Cheaper than A100 if sufficient
min_concurrency = 0 # Scale to zero when not used
keep_alive = 120 # Shorter keep-alive
# Use appropriate GPU for model size
# T4: < 16GB VRAM models
# A10G: 16-24GB VRAM models
# A100: 24-80GB VRAM models
Scaling Strategies
1. Horizontal Scaling
class ScalableApp(fal.App):
machine_type = "GPU-A100"
min_concurrency = 2 # Always have 2 instances
max_concurrency = 20 # Scale up to 20
# fal handles auto-scaling based on queue depth
2. Request Batching
class BatchApp(fal.App):
@fal.endpoint("/batch")
def batch_generate(self, prompts: list[str]) -> list[dict]:
# Process multiple prompts in one request
results = []
for prompt in prompts:
result = self.model(prompt)
results.append(result)
return results
3. Priority Queues
Use different endpoints for different priorities:
class PriorityApp(fal.App):
machine_type = "GPU-A100"
@fal.endpoint("/high-priority")
def high_priority(self, request):
# Separate endpoint for important requests
return self.process(request)
@fal.endpoint("/standard")
def standard(self, request):
# Standard processing
return self.process(request)
Monitoring and Debugging
1. Add Logging
import logging
class MonitoredApp(fal.App):
def setup(self):
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
self.logger.info("App starting up")
# Load model
self.logger.info("Model loaded successfully")
@fal.endpoint("/generate")
def generate(self, request):
import time
start = time.time()
result = self.process(request)
elapsed = time.time() - start
self.logger.info(f"Request processed in {elapsed:.2f}s")
return result
2. Track Metrics
// Client-side timing
const start = Date.now();
const result = await fal.subscribe("fal-ai/flux/dev", {
input: { prompt: "test" },
onQueueUpdate: (update) => {
if (update.status === "IN_QUEUE") {
console.log(`Queue position: ${update.queue_position}`);
}
}
});
const elapsed = Date.now() - start;
console.log(`Total time: ${elapsed}ms`);
// Track in your analytics
analytics.track("fal_generation", {
model: "flux/dev",
elapsed_ms: elapsed,
queue_time_ms: result.timings?.queue,
inference_time_ms: result.timings?.inference
});
3. Error Monitoring
try {
const result = await fal.subscribe("fal-ai/flux/dev", {
input: { prompt: "test" }
});
} catch (error) {
// Log to error tracking service
errorTracker.captureException(error, {
tags: {
model: "flux/dev",
type: error.constructor.name
},
extra: {
status: error.status,
body: error.body
}
});
// Handle gracefully
return fallbackResult();
}
Checklist
Before Production
- Using queue-based execution (
subscribe) - Appropriate model selected for use case
- Image sizes optimized
- Error handling implemented
- Rate limiting in place
- Caching strategy defined
Serverless Deployment
- Correct machine type for model size
- Models loaded in
setup(), not per-request - Persistent volumes for large models
- Secrets properly configured
- Health check endpoint
- Logging enabled
Cost Management
- Scale-to-zero enabled (
min_concurrency = 0) - Appropriate
keep_alivesetting - Using cheaper models for development
- Batch processing where possible
- Webhook callbacks instead of polling
Monitoring
- Latency tracking
- Error rate monitoring
- Cost tracking
- Queue depth alerts
Recommended Agent Skills
Expand your agent's capabilities with these related and highly-rated skills.
opentofu-guide
Comprehensive OpenTofu expertise including migration from Terraform, state encryption, OpenTofu 1.10/1.11 features (OCI registry, native S3 locking, ephemeral resources, enabled meta-argument), and CI/CD integration. Covers when to use OpenTofu vs Terraform with decision matrix.
terraform-tasks
Specialized Terraform task execution skill for autonomous infrastructure operations. Handles code generation, debugging, version management (1.10-1.14+), security scanning, and architecture design across all providers (AWS 6.0, AzureRM 4.x, GCP) and platforms. Covers ephemeral values, Terraform Stacks, policy-as-code, and 2025 best practices.
shellcheck-cicd-2025
ShellCheck validation as non-negotiable 2025 workflow practice
bash-master
Expert bash/shell scripting system across ALL platforms. PROACTIVELY activate for: (1) ANY bash/shell script task, (2) System automation, (3) DevOps/CI/CD scripts, (4) Build/deployment automation, (5) Script review/debugging, (6) Converting commands to scripts. Provides: Google Shell Style Guide compliance, ShellCheck validation, cross-platform compatibility (Linux/macOS/Windows/containers), POSIX compliance, security hardening, error handling, performance optimization, testing with BATS, and production-ready patterns. Ensures professional-grade, secure, portable scripts every time.
process-substitution-fifos
Process substitution, named pipes (FIFOs), and advanced IPC patterns for efficient bash data streaming (2025)
modern-automation-patterns
Modern DevOps and CI/CD automation patterns with containers and cloud (2025)
Didn't find tool you were looking for?