plugins/fal-ai-master/skills/fal-optimization/SKILL.md
Complete fal.ai optimization system. PROACTIVELY activate for: (1) Queue vs run performance, (2) Parallel request batching, (3) Streaming for real-time UI, (4) WebSocket for interactive apps, (5) Model cost comparison, (6) Image size optimization, (7) Inference step tuning, (8) Webhook vs polling, (9) Result caching by seed, (10) Serverless scaling config. Provides: Parallel patterns, cost strategies, caching examples, monitoring setup. Ensures optimal performance and cost-effective usage.
npx skillsauth add JosiahSiegel/claude-plugin-marketplace fal-optimizationInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
| Optimization | Technique | Impact |
|--------------|-----------|--------|
| Parallel requests | Promise.all() with batches | 5-10x throughput |
| Avoid polling | Use webhooks | Lower API calls |
| Cache by seed | Store prompt+seed results | Avoid regeneration |
| Right-size images | Use needed resolution | Lower cost |
| Fewer steps | Reduce inference steps | Faster, cheaper |
| Model Tier | Development | Production | |------------|-------------|------------| | Image | FLUX Schnell | FLUX.2 Pro | | Video | Runway Turbo | Kling 2.6 Pro |
| Serverless Config | Cost-Optimized | Latency-Optimized |
|-------------------|----------------|-------------------|
| min_concurrency | 0 | 1+ |
| keep_alive | 120 | 600+ |
| machine_type | Smallest viable | Higher tier |
Use for performance and cost optimization:
Related skills:
fal-api-referencefal-model-guidefal-serverless-guideStrategies for optimizing performance, reducing costs, and scaling fal.ai integrations.
Always prefer subscribe() over run() for generation tasks:
// Recommended: Queue-based with progress tracking
const result = await fal.subscribe("fal-ai/flux/dev", {
input: { prompt: "test" },
logs: true,
onQueueUpdate: (update) => {
// Show progress to users
if (update.status === "IN_PROGRESS") {
console.log("Generating...");
}
}
});
// Only use run() for fast endpoints (< 30s)
const quickResult = await fal.run("fal-ai/fast-sdxl", {
input: { prompt: "quick test" }
});
Process multiple requests concurrently:
// JavaScript - Parallel execution
async function generateBatch(prompts: string[]) {
const results = await Promise.all(
prompts.map(prompt =>
fal.subscribe("fal-ai/flux/dev", {
input: { prompt }
})
)
);
return results;
}
// With rate limiting
async function generateBatchWithLimit(prompts: string[], limit = 5) {
const results = [];
for (let i = 0; i < prompts.length; i += limit) {
const batch = prompts.slice(i, i + limit);
const batchResults = await Promise.all(
batch.map(prompt =>
fal.subscribe("fal-ai/flux/dev", { input: { prompt } })
)
);
results.push(...batchResults);
// Small delay between batches
if (i + limit < prompts.length) {
await new Promise(r => setTimeout(r, 100));
}
}
return results;
}
# Python - Async parallel
import asyncio
import fal_client
async def generate_batch(prompts: list[str]) -> list[dict]:
tasks = [
fal_client.run_async("fal-ai/flux/dev", arguments={"prompt": p})
for p in prompts
]
return await asyncio.gather(*tasks)
# With semaphore for rate limiting
async def generate_batch_limited(prompts: list[str], limit: int = 5):
semaphore = asyncio.Semaphore(limit)
async def generate_one(prompt: str):
async with semaphore:
return await fal_client.run_async(
"fal-ai/flux/dev",
arguments={"prompt": prompt}
)
return await asyncio.gather(*[generate_one(p) for p in prompts])
Use streaming for progressive output:
// Show incremental progress
const stream = await fal.stream("fal-ai/flux/dev", {
input: { prompt: "A landscape" }
});
for await (const event of stream) {
updateProgressUI(event);
}
const result = await stream.done();
For real-time applications with continuous input:
const connection = fal.realtime.connect("fal-ai/lcm-sd15-i2i", {
connectionKey: `user-${userId}`,
throttleInterval: 128, // Debounce rapid inputs
onResult: (result) => {
displayImage(result.images[0].url);
}
});
// Send updates as user types/draws
inputElement.addEventListener('input', (e) => {
connection.send({
prompt: e.target.value,
image_url: currentImage
});
});
class OptimizedApp(fal.App):
machine_type = "GPU-A100"
requirements = ["torch", "transformers", "accelerate"]
volumes = {
"/data": fal.Volume("model-cache")
}
def setup(self):
import torch
from transformers import AutoModelForCausalLM
# Use fp16 for faster inference and less memory
self.model = AutoModelForCausalLM.from_pretrained(
"model-name",
torch_dtype=torch.float16,
device_map="auto",
cache_dir="/data/models" # Persistent cache
)
# Enable optimizations
if hasattr(self.model, 'enable_attention_slicing'):
self.model.enable_attention_slicing()
class WarmApp(fal.App):
machine_type = "GPU-A100"
keep_alive = 600 # 10 minutes warm
min_concurrency = 1 # Always keep one ready
# Use lightweight health check
@fal.endpoint("/health")
def health(self):
return {"status": "ok"}
class MemoryEfficientApp(fal.App):
def setup(self):
import torch
# Use mixed precision
self.model = load_model(torch_dtype=torch.float16)
# Enable memory-efficient attention (if using transformers)
self.model.enable_xformers_memory_efficient_attention()
def teardown(self):
# Clean up GPU memory
import torch
if hasattr(self, 'model'):
del self.model
torch.cuda.empty_cache()
@fal.endpoint("/generate")
def generate(self, request):
import torch
with torch.inference_mode(): # Disable gradient tracking
result = self.model(request.input)
return result
| Need | Cheaper Option | Premium Option | |------|---------------|----------------| | Quick iteration | FLUX Schnell ($) | FLUX.1 Dev ($$) | | Production | FLUX.1 Dev ($$) | FLUX.2 Pro ($$$) | | Video preview | Runway Turbo ($$) | Kling Pro ($$$) |
// Development: Use fast/cheap models
const preview = await fal.subscribe("fal-ai/flux/schnell", {
input: { prompt: "test", num_inference_steps: 4 }
});
// Production: Use quality models
const final = await fal.subscribe("fal-ai/flux-2-pro", {
input: { prompt: "test" }
});
Generate at the size you need, not larger:
// Don't generate larger than needed
const result = await fal.subscribe("fal-ai/flux/dev", {
input: {
prompt: "test",
// Use preset sizes
image_size: "square_hd", // 1024x1024
// Or specific dimensions
image_size: { width: 800, height: 600 }
}
});
Find the minimum steps for acceptable quality:
// Quick previews: fewer steps
const preview = await fal.subscribe("fal-ai/flux/dev", {
input: {
prompt: "test",
num_inference_steps: 15 // Faster, slightly lower quality
}
});
// Final render: more steps
const final = await fal.subscribe("fal-ai/flux/dev", {
input: {
prompt: "test",
num_inference_steps: 28 // Default, high quality
}
});
Avoid polling overhead with webhooks:
// Instead of polling
const result = await fal.subscribe("fal-ai/flux/dev", {
input: { prompt: "test" },
pollInterval: 1000 // Polling = more API calls
});
// Use webhooks
const { request_id } = await fal.queue.submit("fal-ai/flux/dev", {
input: { prompt: "test" },
webhookUrl: "https://your-server.com/webhook"
});
// No polling needed - result delivered to webhook
Use seeds for reproducible outputs:
// Cache key based on prompt + seed
const cacheKey = `${prompt}-${seed}`;
const cached = await cache.get(cacheKey);
if (cached) {
return cached;
}
const result = await fal.subscribe("fal-ai/flux/dev", {
input: { prompt, seed }
});
await cache.set(cacheKey, result);
return result;
class CostOptimizedApp(fal.App):
machine_type = "GPU-A10G" # Cheaper than A100 if sufficient
min_concurrency = 0 # Scale to zero when not used
keep_alive = 120 # Shorter keep-alive
# Use appropriate GPU for model size
# T4: < 16GB VRAM models
# A10G: 16-24GB VRAM models
# A100: 24-80GB VRAM models
class ScalableApp(fal.App):
machine_type = "GPU-A100"
min_concurrency = 2 # Always have 2 instances
max_concurrency = 20 # Scale up to 20
# fal handles auto-scaling based on queue depth
class BatchApp(fal.App):
@fal.endpoint("/batch")
def batch_generate(self, prompts: list[str]) -> list[dict]:
# Process multiple prompts in one request
results = []
for prompt in prompts:
result = self.model(prompt)
results.append(result)
return results
Use different endpoints for different priorities:
class PriorityApp(fal.App):
machine_type = "GPU-A100"
@fal.endpoint("/high-priority")
def high_priority(self, request):
# Separate endpoint for important requests
return self.process(request)
@fal.endpoint("/standard")
def standard(self, request):
# Standard processing
return self.process(request)
import logging
class MonitoredApp(fal.App):
def setup(self):
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
self.logger.info("App starting up")
# Load model
self.logger.info("Model loaded successfully")
@fal.endpoint("/generate")
def generate(self, request):
import time
start = time.time()
result = self.process(request)
elapsed = time.time() - start
self.logger.info(f"Request processed in {elapsed:.2f}s")
return result
// Client-side timing
const start = Date.now();
const result = await fal.subscribe("fal-ai/flux/dev", {
input: { prompt: "test" },
onQueueUpdate: (update) => {
if (update.status === "IN_QUEUE") {
console.log(`Queue position: ${update.queue_position}`);
}
}
});
const elapsed = Date.now() - start;
console.log(`Total time: ${elapsed}ms`);
// Track in your analytics
analytics.track("fal_generation", {
model: "flux/dev",
elapsed_ms: elapsed,
queue_time_ms: result.timings?.queue,
inference_time_ms: result.timings?.inference
});
try {
const result = await fal.subscribe("fal-ai/flux/dev", {
input: { prompt: "test" }
});
} catch (error) {
// Log to error tracking service
errorTracker.captureException(error, {
tags: {
model: "flux/dev",
type: error.constructor.name
},
extra: {
status: error.status,
body: error.body
}
});
// Handle gracefully
return fallbackResult();
}
subscribe)setup(), not per-requestmin_concurrency = 0)keep_alive settingdevelopment
This skill should be used when the user asks to train, debug, scale, or improve ML models. PROACTIVELY activate for: (1) PyTorch, TensorFlow/Keras, JAX, Flax, Hugging Face Trainer/Accelerate training loops, (2) distributed training, DDP/FSDP/DeepSpeed, TPU/GPU setup, (3) mixed precision AMP/bf16, gradient accumulation, checkpointing, seeding, (4) overfitting, imbalance, loss functions, regularization, LR schedules, warmup, (5) memory optimization, gradient checkpointing, offloading, quantization-aware training. Provides: reproducible training best practices across deep learning and classical ML.
development
This skill should be used when the user asks to productionize, track, version, govern, monitor, or automate ML systems. PROACTIVELY activate for: (1) MLflow, Weights & Biases, Neptune, Comet, ClearML experiment tracking, (2) model registry, model versioning, artifact lineage, reproducibility, (3) Kubeflow, SageMaker Pipelines, Vertex AI Pipelines, Azure ML pipelines, Databricks workflows, (4) CI/CD, continuous training/evaluation, A/B tests, canary/shadow deployments, (5) drift detection, model monitoring, data validation, responsible AI governance. Provides: end-to-end MLOps architecture and operational safeguards.
development
This skill should be used when the user asks to optimize, export, serve, compress, or accelerate ML inference. PROACTIVELY activate for: (1) latency, throughput, p95/p99, batching, concurrency, KV cache, memory, or cost issues, (2) quantization INT8/INT4, GPTQ, AWQ, bitsandbytes, pruning, sparsity, distillation, (3) ONNX export, ONNX Runtime, TensorRT, TorchScript, torch.compile, XLA, OpenVINO, Core ML, TFLite, (4) Triton, TorchServe, TF Serving, BentoML, Seldon, KServe configuration, (5) edge deployment, CPU/GPU/TPU/Inferentia serving. Provides: hardware-aware inference optimization and safe benchmarking.
testing
This skill should be used when the user asks to tune hyperparameters, run sweeps, optimize search spaces, or use AutoML. PROACTIVELY activate for: (1) Optuna, Ray Tune, FLAML, AutoGluon, Hyperopt, Nevergrad, KerasTuner, W&B sweeps, (2) grid search, random search, Bayesian optimization, TPE, Gaussian processes, evolutionary search, (3) ASHA, Hyperband, successive halving, multi-fidelity optimization, population-based training, (4) learning-rate finder, batch-size search, early stopping, pruning, (5) reproducible sweep design and experiment analysis. Provides: budget-aware hyperparameter search strategy.