skills/scientific-computing/dask-parallel-computing/SKILL.md
Parallel/distributed computing for larger-than-RAM data. Components: DataFrames (parallel pandas), Arrays (parallel NumPy), Bags, Futures, Schedulers. Scales laptop to HPC cluster. For single-machine speed use polars; for out-of-core without cluster use vaex.
npx skillsauth add jaechang-hits/sciagent-skills dask-parallel-computingInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Dask is a Python library for parallel and distributed computing that scales familiar pandas/NumPy APIs to larger-than-memory datasets. It provides five main components (DataFrames, Arrays, Bags, Futures, Schedulers) and scales from single-machine multi-core to multi-node HPC clusters.
pip install dask[complete] # All components
pip install dask[dataframe] # DataFrames only
pip install dask[distributed] # Distributed scheduler + dashboard
pip install dask-jobqueue # HPC cluster integration (SLURM, PBS)
import dask.dataframe as dd
# Read multiple files as a single DataFrame
ddf = dd.read_csv('data/2024-*.csv')
ddf = dd.read_parquet('data/', columns=['id', 'value', 'category'])
# Operations are lazy until .compute()
filtered = ddf[ddf['value'] > 100]
result = filtered.groupby('category').agg({'value': ['mean', 'sum']}).compute()
print(result.shape) # (n_categories, 2)
# Custom operations via map_partitions (preferred over apply)
def normalize_partition(df):
df['norm_value'] = (df['value'] - df['value'].mean()) / df['value'].std()
return df
ddf = ddf.map_partitions(normalize_partition)
# Joins
ddf_merged = ddf.merge(lookup_ddf, on='category', how='left')
# Write results
ddf.to_parquet('output/', engine='pyarrow')
# Repartitioning for optimal chunk sizes
ddf = ddf.repartition(npartitions=20) # By count
ddf = ddf.repartition(partition_size='100MB') # By size
# Index management for sorted operations
ddf = ddf.set_index('timestamp', sorted=True)
# Debugging
print(f"Partitions: {ddf.npartitions}")
print(f"Dtypes: {ddf.dtypes}")
sample = ddf.get_partition(0).compute() # Inspect first partition
import dask.array as da
import numpy as np
# Create from various sources
x = da.random.random((100000, 1000), chunks=(10000, 1000))
x = da.from_array(np_array, chunks=(10000, 1000))
x = da.from_zarr('large_dataset.zarr')
# Standard operations (lazy)
y = (x - x.mean(axis=0)) / x.std(axis=0) # Normalize
z = da.dot(x.T, x) # Matrix multiply
u, s, v = da.linalg.svd(x) # SVD
# Compute and persist
result = y.mean(axis=0).compute()
print(result.shape) # (1000,)
# Custom operations with map_blocks
def custom_filter(block):
from scipy.ndimage import gaussian_filter
return gaussian_filter(block, sigma=2)
filtered = da.map_blocks(custom_filter, x, dtype=x.dtype)
# Rechunking for different access patterns
x_rechunked = x.rechunk({0: 5000, 1: 500})
# Save to disk
da.to_zarr(y, 'normalized.zarr')
import dask.bag as db
import json
# Read unstructured data
bag = db.read_text('logs/*.json').map(json.loads)
# Functional operations
valid = bag.filter(lambda x: x['status'] == 'success')
ids = valid.pluck('user_id')
flat = bag.map(lambda x: x['tags']).flatten()
# Aggregation — use foldby instead of groupby (much faster)
counts = bag.foldby(
key='category',
binop=lambda total, x: total + x['amount'],
initial=0,
combine=lambda a, b: a + b,
combine_initial=0
).compute()
# Convert to DataFrame for structured analysis
ddf = valid.to_dataframe(meta={'user_id': 'str', 'amount': 'float64', 'category': 'str'})
from dask.distributed import Client
client = Client() # Local cluster with all cores
print(client.dashboard_link) # http://localhost:8787
# Submit individual tasks (executes immediately, not lazy)
def process(x, param):
return x ** param
future = client.submit(process, 42, param=2)
print(future.result()) # 1764
# Map over many inputs
futures = client.map(process, range(100), param=2)
results = client.gather(futures)
print(len(results)) # 100
# Scatter large data to workers (avoids repeated transfers)
import numpy as np
big_data = np.random.random((10000, 1000))
data_future = client.scatter(big_data, broadcast=True)
# Submit tasks using scattered data
futures = [client.submit(process_chunk, data_future, i) for i in range(10)]
results = client.gather(futures)
# Progressive result processing
from dask.distributed import as_completed
for future in as_completed(futures):
result = future.result()
print(f"Completed: {result}")
# Coordination primitives
from dask.distributed import Lock, Queue, Event
lock = Lock('resource-lock')
with lock:
# Thread-safe operation across workers
pass
client.close()
import dask
# Global scheduler setting
dask.config.set(scheduler='threads') # Default: GIL-releasing numeric work
dask.config.set(scheduler='processes') # Pure Python, GIL-bound work
dask.config.set(scheduler='synchronous') # Debugging with pdb
# Context manager for temporary change
with dask.config.set(scheduler='synchronous'):
result = computation.compute() # Can use pdb here
# Per-compute override
result = ddf.mean().compute(scheduler='processes')
# Distributed scheduler with resource control
from dask.distributed import Client
client = Client(n_workers=4, threads_per_worker=2, memory_limit='4GB')
print(client.dashboard_link)
# HPC cluster integration
from dask_jobqueue import SLURMCluster
from dask.distributed import Client
cluster = SLURMCluster(
cores=24, memory='100GB',
walltime='02:00:00', queue='regular'
)
cluster.scale(jobs=10) # Request 10 SLURM jobs
client = Client(cluster)
# Adaptive scaling
cluster.adapt(minimum=2, maximum=20)
result = computation.compute()
client.close()
| Data Type | Component | When to Use | |-----------|-----------|-------------| | Tabular (CSV, Parquet) | DataFrames | Standard pandas-like operations at scale | | Numeric arrays (HDF5, Zarr) | Arrays | NumPy operations, linear algebra, image processing | | Text, JSON, logs | Bags | ETL/cleaning → convert to DataFrame for analysis | | Custom parallel tasks | Futures | Dynamic workflows, parameter sweeps, task dependencies | | Any of above | Schedulers | Control execution backend (threads/processes/distributed) |
Control level: DataFrames/Arrays/Bags = high-level lazy API. Futures = low-level immediate execution.
All DataFrames, Arrays, and Bags build a task graph — nothing executes until .compute() or .persist().
.compute() — execute and return result to local memory.persist() — execute and keep result on workers (for reuse across multiple computations)dask.compute(a, b, c) — compute multiple results in a single pass (shares intermediates)Target: ~100 MB per chunk (or 10 chunks per core in worker memory).
| Chunk Size | Effect | |-----------|--------| | Too large (>1 GB) | Memory overflow, poor parallelization | | Optimal (~100 MB) | Good parallelism, manageable memory | | Too small (<1 MB) | Excessive scheduling overhead |
Example: 8 cores, 32 GB RAM → target ~400 MB per chunk (32 GB / 8 cores / 10).
| Scheduler | Overhead | Best For | GIL |
|-----------|----------|----------|-----|
| threads (default) | ~10 µs/task | NumPy, pandas, scikit-learn | Affected |
| processes | ~10 ms/task | Pure Python, text processing | Not affected |
| synchronous | ~1 µs/task | Debugging with pdb | N/A |
| distributed | ~1 ms/task | Dashboard, clusters, advanced features | Configurable |
import dask.dataframe as dd
import dask
# Extract: Read all CSV files
ddf = dd.read_csv('raw_data/*.csv', dtype={'amount': 'float64'})
# Transform: Clean and process
ddf = ddf[ddf['status'] == 'valid']
ddf['amount'] = ddf['amount'].fillna(0)
ddf = ddf.dropna(subset=['category'])
# Aggregate
summary = ddf.groupby('category').agg({'amount': ['sum', 'mean', 'count']})
# Load: Save as Parquet (columnar, compressed)
summary.to_parquet('output/summary.parquet')
print(f"Processed {len(ddf)} rows across {ddf.npartitions} partitions")
import dask.array as da
# Load large scientific dataset
x = da.from_zarr('experiment_data.zarr') # e.g., (50000, 50000) float64
print(f"Shape: {x.shape}, Chunks: {x.chunks}")
# Normalize per-column
x_norm = (x - x.mean(axis=0)) / x.std(axis=0)
# Compute covariance matrix
cov = da.dot(x_norm.T, x_norm) / (x_norm.shape[0] - 1)
# SVD for dimensionality reduction (top-k)
u, s, v = da.linalg.svd_compressed(x_norm, k=50)
# Save results
da.to_zarr(u, 'pca_components.zarr')
print(f"Explained variance (top 5): {(s[:5]**2 / (s**2).sum()).compute()}")
This workflow is a simple combination of Bags (Section 3) → DataFrames (Section 1): read JSON logs with Bags, filter/transform, convert to DataFrame for groupby analysis. Each step maps directly to Core API examples above.
| Parameter | Module | Default | Description |
|-----------|--------|---------|-------------|
| npartitions | DataFrame | auto | Number of partitions (controls parallelism) |
| partition_size | DataFrame | — | Target size per partition (e.g., '100MB') |
| chunks | Array | required | Chunk dimensions (e.g., (10000, 1000)) |
| blocksize | Bag | '128 MiB' | File read block size |
| scheduler | All | 'threads' | Execution backend ('threads', 'processes', 'synchronous') |
| n_workers | Distributed | auto | Number of worker processes |
| threads_per_worker | Distributed | auto | Threads per worker |
| memory_limit | Distributed | auto | Per-worker memory limit (e.g., '4GB') |
| sorted | set_index | False | Whether data is pre-sorted (enables optimizations) |
| meta | map_partitions | — | Output DataFrame/Series structure template |
Let Dask handle data loading — Never load data into pandas/numpy first then convert. Use dd.read_csv() / da.from_zarr() directly.
Batch compute calls — Use dask.compute(a, b, c) instead of calling .compute() in loops. Allows sharing intermediates.
Use map_partitions over apply — ddf.apply(func, axis=1) creates one task per row. ddf.map_partitions(func) creates one task per partition.
Persist reused intermediates — Call .persist() on data accessed multiple times, then del when done.
Use the dashboard — client.dashboard_link shows task progress, memory usage, worker states. Essential for diagnosing performance issues.
Anti-pattern — Excessively large task graphs: If len(ddf.__dask_graph__()) returns millions, increase chunk sizes or use map_partitions/map_blocks to fuse operations.
Anti-pattern — Wrong scheduler for workload: Using threads for pure Python text processing (GIL-bound) or processes for NumPy operations (unnecessary serialization overhead).
from dask_ml.preprocessing import StandardScaler
from dask_ml.model_selection import train_test_split
import dask.array as da
X = da.random.random((100000, 50), chunks=(10000, 50))
y = da.random.randint(0, 2, size=100000, chunks=10000)
# Preprocessing
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# Train/test split
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2)
print(f"Train: {X_train.shape}, Test: {X_test.shape}")
from dask.distributed import Client
client = Client()
class RunningStats:
def __init__(self):
self.count = 0
self.total = 0.0
def add(self, value):
self.count += 1
self.total += value
return self.total / self.count
# Create actor on a worker
stats = client.submit(RunningStats, actor=True).result()
# Call methods (~1ms roundtrip)
for v in [10, 20, 30]:
mean = stats.add(v).result()
print(f"Running mean: {mean}")
client.close()
from dask_kubernetes import KubeCluster
from dask.distributed import Client
cluster = KubeCluster()
cluster.adapt(minimum=2, maximum=50)
client = Client(cluster)
# Run computation on auto-scaling cluster
result = large_computation.compute()
client.close()
| Problem | Cause | Solution |
|---------|-------|----------|
| MemoryError during .compute() | Result too large for local memory | Use .to_parquet() or .persist() instead of .compute() |
| Slow computation start | Task graph has millions of tasks | Increase chunk sizes; use map_partitions/map_blocks |
| Poor parallelization | GIL contention with threads scheduler | Switch to scheduler='processes' for Python-heavy code |
| TypeError in map_partitions | Missing or wrong meta parameter | Provide meta=pd.DataFrame({'col': pd.Series(dtype='float64')}) |
| Workers killed (OOM) | Chunks exceed worker memory | Decrease chunk size; increase memory_limit |
| KilledWorker exception | Worker process crashed | Check worker logs; reduce memory per task; increase memory_limit |
| Slow joins/merges | Data not pre-sorted on join key | Call ddf.set_index('key', sorted=True) before join |
| NotImplementedError | Operation not supported by Dask | Use map_partitions with pandas equivalent |
| Dashboard not accessible | Distributed client not started | Use client = Client() to enable distributed scheduler |
| Data type mismatch across partitions | Inconsistent CSV files | Specify dtype explicitly in dd.read_csv() |
references/collections_guide.md — Detailed DataFrames, Arrays, and Bags guide with comprehensive code examples for reading, transforming, aggregating, and writing data. Covers map_partitions patterns, meta parameter, chunking strategies, map_blocks, foldby, and collection conversion. Consolidated from original dataframes.md, arrays.md, and bags.md. Original best-practices.md content relocated to Best Practices section and Key Concepts inline.references/distributed_computing.md — Futures API, distributed coordination primitives (Locks, Queues, Events, Variables), Actors, scheduler configuration, HPC cluster setup (SLURM, Kubernetes), adaptive scaling, dashboard monitoring, and performance profiling. Consolidated from original futures.md and schedulers.md.Not migrated: Original had 6 reference files. best-practices.md content consolidated into Best Practices section and Key Concepts (chunk strategy, scheduler selection). Remaining content organized into 2 reference files covering the 5 main components.
tools
Fast short-read DNA aligner for WGS/WES/ChIP-seq. 2× faster BWA-MEM successor; outputs SAM/BAM with read group headers for GATK. Primary plus supplementary records for chimeric reads. Use STAR for RNA-seq splice-aware alignment; Bowtie2 is a comparable alternative.
tools
smina molecular docking CLI. AutoDock Vina fork with customizable scoring functions, native SDF/MOL2/PDB ligand input, autoboxing, local energy minimization, and per-atom score breakdowns. Pipeline: receptor PDBQT prep -> ligand prep (RDKit/OpenBabel) -> dock via autobox or explicit grid -> rescore/minimize with custom scoring -> rank poses by affinity. Choose smina over Vina when you need custom scoring terms (--custom_scoring), local optimization of an existing pose (--local_only), per-atom contributions (--atom_term_data), or SDF/MOL2 ligands without manual PDBQT conversion. For unknown binding sites use diffdock-blind-docking; for the Python-bindings/Vinardo workflow use autodock-vina-docking.
development
mdtraj molecular dynamics trajectory analysis (Python). Reads DCD/XTC/TRR/NetCDF/H5/PDB topologies and trajectories; computes RMSD vs time, radius of gyration, per-residue RMSF, residue-residue contact frequency maps, phi/psi torsions for Ramachandran plots (general + Gly/Pro), and 8-state DSSP secondary structure. Modules: trajectory I/O, geometry (distances/angles/dihedrals), structural analysis (RMSD/Rg/RMSF/SASA), contacts, hydrogen bonds, secondary structure (DSSP), NMR observables. For broader atom-selection grammar use mdanalysis-trajectory; for running MD simulations use OpenMM/GROMACS.
development
Programmatic PubMed access via NCBI E-utilities REST API. Covers Boolean/MeSH queries, field-tagged search, endpoints (ESearch, EFetch, ESummary, EPost, ELink), history server for batches, citation matching, systematic review strategies. Use for biomedical literature search or automated pipelines.