i18n/de/skills/orchestrate-ml-pipeline/SKILL.md
Orchestrate end-to-end maschinelles Lernen pipelines using Prefect or Airflow with DAG construction, task Abhaengigkeiten, retry logic, scheduling, monitoring, and integration with MLflow, DVC, and feature stores for production ML workflows. Use when automating multi-step ML workflows from data ingestion to deployment, scheduling periodic model retraining, coordinating distributed training tasks, or managing retry logic and failure recovery across pipeline stages.
npx skillsauth add pjt222/agent-almanac orchestrate-ml-pipelineInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
See Extended Examples for complete configuration files and templates.
Erstellen and orchestrate end-to-end maschinelles Lernen pipelines with Abhaengigkeit management, scheduling, and monitoring.
Auswaehlen appropriate framework and set up infrastructure.
# Option 1: Prefect (modern, Pythonic, simpler)
pip install prefect
pip install prefect-aws prefect-dask prefect-docker
# Start Prefect server (local development)
prefect server start
# Or use Prefect Cloud (managed)
# ... (see EXAMPLES.md for complete implementation)
Docker Compose for Airflow:
# docker-compose.airflow.yml
version: '3.8'
x-airflow-common: &airflow-common
image: apache/airflow:2.8.0
environment:
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
# ... (see EXAMPLES.md for complete implementation)
Erwartet: Orchestration framework installed, web UI accessible (Prefect at http://localhost:4200, Airflow at http://localhost:8080), database initialized, scheduler running.
Bei Fehler: Check port availability (netstat -tulpn | grep 8080), verify database connection, ensure Redis running for Celery, check Python version compatibility (Airflow requires ≥3.8), verify Docker daemon for containerized setup, inspect logs for initialization errors.
Erstellen Prefect flow with tasks fuer jede pipeline stage.
# prefect_ml_pipeline.py
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
# ... (see EXAMPLES.md for complete implementation)
Bereitstellen and schedule:
# deploy_prefect.py
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
from prefect_ml_pipeline import ml_training_pipeline
# Create deployment with schedule
deployment = Deployment.build_from_flow(
flow=ml_training_pipeline,
# ... (see EXAMPLES.md for complete implementation)
Erwartet: Prefect flow executes all tasks in correct order, task failures trigger retries automatisch, successful runs show green in UI, MLflow logs experiments, model registered and deployed.
Bei Fehler: Check task Abhaengigkeiten defined korrekt, verify MLflow server accessible, ensure Datenquelle paths correct, check for circular Abhaengigkeiten, verify task timeout limits, inspect Prefect logs for detailed errors, check resource availability (memory/CPU).
Erstellen Airflow DAG for production ML workflow.
# dags/ml_training_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import mlflow
import pandas as pd
# ... (see EXAMPLES.md for complete implementation)
Erwartet: DAG appears in Airflow UI, scheduled runs execute on time, task failures trigger retries and alerts, XCom passes data zwischen tasks, MLflow integration logs experiments.
Bei Fehler: Check DAG file syntax (python dags/ml_training_dag.py), verify imports available in Airflow environment, ensure XCom not exceeding size limits (use Dateipfads for large data), check email configuration for alerts, verify scheduler running, inspect task logs in Airflow UI.
Hinzufuegen dynamic DAGs, branching, and parallel execution.
# advanced_pipeline.py (Prefect)
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner, ConcurrentTaskRunner
import time
@task
def process_shard(shard_id: int, data: list) -> dict:
"""Process data shard in parallel."""
# ... (see EXAMPLES.md for complete implementation)
Airflow branching:
# Airflow branching with BranchPythonOperator
from airflow.operators.python import BranchPythonOperator
def check_data_quality(**context):
"""Decide which branch to take."""
data_path = context['ti'].xcom_pull(key='data_path')
df = pd.read_csv(data_path)
# ... (see EXAMPLES.md for complete implementation)
Erwartet: Parallel tasks execute conderzeit (faster pipeline), conditional branches execute basierend auf logic, dynamic task generation works, Dask cluster distributes work.
Bei Fehler: Check Dask cluster configured and accessible, verify task_runner specified, ensure branching returns valid task IDs, check for resource contention with parallel tasks, verify conditional logic correctness.
Hinzufuegen comprehensive monitoring and failure notifications.
# monitoring_integration.py
from prefect.blocks.notifications import SlackWebhook
from prefect import flow, task, get_run_logger
from prefect.context import FlowRunContext
@task(on_failure=[notify_failure])
def critical_task():
"""Task with failure notification."""
# ... (see EXAMPLES.md for complete implementation)
Airflow monitoring with sensors:
# Airflow SLA and monitoring
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
default_args = {
'sla': timedelta(hours=4), # Alert if task exceeds 4 hours
'on_failure_callback': slack_alert_failure,
'on_success_callback': slack_alert_success,
# ... (see EXAMPLES.md for complete implementation)
Erwartet: Slack/email notifications sent on failures, SLA violations trigger alerts, custom metrics tracked, logs aggregated in monitoring system.
Bei Fehler: Verifizieren Slack webhook configured korrekt, check email SMTP settings, ensure notification blocks loaded ordnungsgemaess, verify SLA values reasonable, check for network issues blocking notifications.
Version control and automate pipeline deployments.
# .github/workflows/deploy-pipeline.yml
name: Deploy ML Pipeline
on:
push:
branches: [main]
paths:
- 'pipelines/**'
# ... (see EXAMPLES.md for complete implementation)
Erwartet: Pipeline tests pass vor deployment, automated deployment to production, team notified on successful deployment, pipeline versioning tracked in Git.
Bei Fehler: Check test coverage and failures, verify Prefect Cloud Zugangsdaten, ensure deployment script handles errors, check Slack webhook configuration, inspect CI logs for deployment errors.
track-ml-experiments - Integrieren MLflow tracking into pipeline tasksversion-ml-data - Use DVC for data versioning in pipelinesbuild-feature-store - Materialize features as pipeline taskdeploy-ml-model-serving - Hinzufuegen deployment as final pipeline stagedeploy-to-kubernetes - Ausfuehren orchestrated pipelines on Kubernetestesting
Launch all available agents in parallel waves for open-ended hypothesis generation on problems where the correct domain is unknown. Use when facing a cross-domain problem with no clear starting point, when single-agent approaches have stalled, or when diverse perspectives are more valuable than deep expertise. Produces a ranked hypothesis set with convergence analysis and adversarial refinement.
tools
Write integration tests for a Node.js CLI application using the built-in node:test module. Covers the exec helper pattern, output assertions, filesystem state verification, cleanup hooks, JSON output parsing, error case testing, and state restoration after destructive tests. Use when adding tests to an existing CLI, testing a new command, verifying adapter behavior across frameworks, or setting up CI for a CLI tool.
development
Screen a proposed trademark for conflicts and distinctiveness before filing. Covers trademark database searches (TMview, WIPO Global Brand Database, USPTO TESS), distinctiveness analysis using the Abercrombie spectrum, likelihood of confusion assessment using DuPont factors and EUIPO relative grounds, common law rights evaluation, and goods/services overlap analysis. Produces a conflict report with a risk matrix. Use before adopting a new brand name, logo, or slogan — distinct from patent prior art search, which uses different databases, legal frameworks, and analysis methods.
tools
Scaffold a new CLI command using Commander.js with options, action handler, three output modes (human-readable, quiet, JSON), and optional ceremony variant. Covers command naming, option design, shared context patterns, error handling, and integration testing. Use when adding a command to an existing Commander.js CLI, designing a new CLI tool from scratch, or standardizing command structure across a multi-command CLI.