.github/skills/multi-threading-processing/SKILL.md
Comprehensive guide for using multi-threading and multi-processing in Python, including when to choose each approach, best practices, and practical examples using the speedy_utils library.
npx skillsauth add anhvth/speedy_utils multi-threading-processingInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
This skill provides comprehensive guidance for implementing concurrent and parallel processing in Python applications. It covers when to use multi-threading vs multi-processing, best practices, performance considerations, and practical examples using the speedy_utils library's multi-worker implementations.
Best for:
When to choose threading:
# I/O-bound example: Downloading multiple files
import requests
from speedy_utils import multi_thread
def download_file(url):
response = requests.get(url)
return len(response.content)
urls = ['https://example.com/file1', 'https://example.com/file2']
results = multi_thread(download_file, urls, workers=10)
Best for:
When to choose processing:
# CPU-bound example: Heavy computation
from speedy_utils import multi_process
def fibonacci(n):
if n <= 1:
return n
return fibonacci(n - 1) + fibonacci(n - 2)
numbers = list(range(30, 35))
results = multi_process(fibonacci, numbers, workers=4)
| Factor | Multi-Threading | Multi-Processing | |--------|----------------|------------------| | GIL Impact | Limited by GIL | Bypasses GIL | | Memory | Shared memory | Separate memory spaces | | Communication | Easy (shared variables) | Complex (pickling required) | | Overhead | Low | High (process creation) | | Scalability | Limited by cores | Scales with cores | | Debugging | Easier | More complex |
The speedy_utils library provides multiple backends:
from speedy_utils import multi_process
# For I/O-bound tasks
results = multi_process(func, items, backend='threadpool')
# For CPU-bound tasks
results = multi_process(func, items, backend='mp') # multiprocessing
# For distributed computing (if Ray available)
results = multi_process(func, items, backend='ray')
import os
from speedy_utils import multi_thread
# For threading: Use more workers than CPU cores
cpu_count = os.cpu_count() or 4
thread_workers = cpu_count * 2 # Good starting point
# For processing: Match CPU cores (or slightly less)
process_workers = max(1, cpu_count - 1)
results = multi_thread(func, items, workers=thread_workers)
# Continue processing even if some tasks fail
results = multi_thread(
func,
items,
stop_on_error=False, # Don't abort on first error
workers=4
)
# Check for None values (failed tasks)
successful_results = [r for r in results if r is not None]
failed_count = results.count(None)
# For many small tasks, batch them to reduce overhead
def process_batch(items):
return [item * 2 for item in items]
results = multi_thread(
process_batch,
items,
batch=10, # Process 10 items per task
workers=4
)
from speedy_utils import kill_all_thread, cleanup_phantom_workers
# Force cleanup of stuck threads/processes
killed_threads = kill_all_thread()
killed_processes = cleanup_phantom_workers()
Threading: Shared memory, lower overhead
# Threads share memory - efficient for large datasets
large_data = load_large_dataset()
results = multi_thread(process_item, large_data, workers=8)
Processing: Separate memory spaces, higher overhead
# Each process gets a copy - use for CPU-bound work
results = multi_process(cpu_intensive_func, data, workers=4)
Threading: Easy communication
shared_results = []
lock = threading.Lock()
def thread_safe_append(item):
with lock:
shared_results.append(process_item(item))
multi_thread(thread_safe_append, items, workers=4)
Processing: Requires serialization
# Use return values or shared storage
def process_and_return(item):
result = heavy_computation(item)
return result # Pickled and sent back
results = multi_process(process_and_return, items, workers=4)
from speedy_utils import multi_thread
import pandas as pd
def process_dataframe_chunk(chunk):
# Process a chunk of data
return chunk['value'].sum()
# Split large dataframe into chunks
df = pd.read_csv('large_file.csv')
chunks = [df[i:i+1000] for i in range(0, len(df), 1000)]
results = multi_thread(process_dataframe_chunk, chunks, workers=8)
total = sum(results)
import aiohttp
from speedy_utils import multi_thread
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
def fetch_multiple_urls(urls):
async def main():
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
return await asyncio.gather(*tasks)
return asyncio.run(main())
results = multi_thread(fetch_multiple_urls, [urls], workers=1) # Single async task
from speedy_utils import multi_process
from PIL import Image
import numpy as np
def process_image(image_path):
img = Image.open(image_path)
# Heavy image processing
processed = np.array(img) ** 2 # Example computation
return processed.mean()
image_paths = ['image1.jpg', 'image2.jpg', ...]
results = multi_process(process_image, image_paths, workers=4)
from speedy_utils import multi_thread
def risky_operation(x):
if x == 42:
raise ValueError(f"Bad input: {x}")
return x * 2
# Catch and handle errors
try:
results = multi_thread(risky_operation, range(100), stop_on_error=True)
except Exception as e:
print(f"Processing failed: {e}")
# Handle error appropriately
import logging
from speedy_utils import multi_thread
logging.basicConfig(level=logging.INFO)
def monitored_task(item):
logging.info(f"Processing item: {item}")
result = process_item(item)
logging.info(f"Completed item: {item}")
return result
results = multi_thread(monitored_task, items, workers=4)
from speedy_utils import multi_thread
def slow_task(x):
time.sleep(0.1) # Simulate work
return x ** 2
# Automatic progress bar
results = multi_thread(slow_task, range(100), progress=True)
from speedy_utils import multi_process
# Automatically uses Ray if available
results = multi_process(
heavy_computation,
large_dataset,
backend='ray',
workers=16 # Can exceed local cores
)
import pandas as pd
from speedy_utils import multi_thread
def process_row(row):
# Process a single row
return row['value'] * 2
df = pd.read_csv('data.csv')
# Process rows in parallel
results = multi_thread(process_row, df.to_dict('records'), workers=8)
processed_df = pd.DataFrame(results)
import time
from speedy_utils import multi_thread, multi_process
def benchmark_func(x):
# Some computation
return sum(i**2 for i in range(x))
data = list(range(100, 200))
# Benchmark threading
start = time.time()
thread_results = multi_thread(benchmark_func, data, workers=8, progress=False)
thread_time = time.time() - start
# Benchmark processing
start = time.time()
process_results = multi_process(benchmark_func, data, workers=4, progress=False, backend='safe')
process_time = time.time() - start
# Benchmark sequential
start = time.time()
sequential_results = [benchmark_func(x) for x in data]
sequential_time = time.time() - start
print(f"Threading: {thread_time:.2f}s")
print(f"Processing: {process_time:.2f}s")
print(f"Sequential: {sequential_time:.2f}s")
Problem: Python's GIL prevents true parallel execution in threads for CPU-bound tasks.
Solution: Use multi-processing for CPU-bound work.
Problem: Functions/objects can't be pickled for inter-process communication.
Solution: Define functions at module level, avoid lambdas.
# Good: Module-level function
def process_item(x):
return x * 2
# Bad: Lambda
lambda x: x * 2 # Can't be pickled
Problem: Large datasets copied to each process.
Solution: Use threading for large data, or lazy loading.
Problem: Threads/processes waiting indefinitely.
Solution: Use timeouts and proper cleanup.
results = multi_thread(
func,
items,
timeout=300, # 5 minute timeout
workers=4
)
from concurrent.futures import ThreadPoolExecutor
from speedy_utils.multi_worker.thread import _track_executor_threads
def custom_processing(items, workers=4):
with ThreadPoolExecutor(max_workers=workers) as executor:
_track_executor_threads(executor) # Track for cleanup
futures = [executor.submit(func, item) for item in items]
return [f.result() for f in futures]
import asyncio
from speedy_utils import multi_thread
async def async_task(x):
await asyncio.sleep(0.1)
return x * 2
def run_async_tasks(items):
async def main():
return await asyncio.gather(*[async_task(x) for x in items])
return asyncio.run(main())
# Run multiple async batches in parallel
batches = [items[i:i+10] for i in range(0, len(items), 10)]
results = multi_thread(run_async_tasks, batches, workers=4)
For processing large HuggingFace datasets or other big data, multi_process particularly shines when combined with data sharding patterns. See the dedicated dataset-processing-multiprocessing skill for:
Quick Example:
from speedy_utils import multi_process
from datasets import load_from_disk
def process_shard(args):
shard_id, start_idx, end_idx, dataset_path = args
ds = load_from_disk(dataset_path)
# Process slice [start_idx:end_idx]
transformed = [transform(ds[i]) for i in range(start_idx, end_idx)]
# Save result to temp location
return save_shard(transformed, shard_id)
# Distribute 100k rows across 4 workers
num_records = 100000
num_workers = 4
rows_per_worker = num_records // num_workers
worker_args = [
(i, i*rows_per_worker, (i+1)*rows_per_worker, 'path/to/dataset')
for i in range(num_workers)
]
shard_results = multi_process(process_shard, worker_args, workers=num_workers)
# Merge shard_results to get final dataset
This skill leverages the speedy_utils library's multi-worker implementations:
speedy_utils.multi_thread() - Advanced threading with batching, progress bars, and error handlingspeedy_utils.multi_process() - Multi-processing with multiple backends (Ray, multiprocessing, threading)speedy_utils.kill_all_thread() - Emergency thread cleanupspeedy_utils.cleanup_phantom_workers() - Process cleanup utilitiesSee the library's test files for additional examples and edge cases.
documentation
Guide for using vision utilities in speedy_utils, including fast GPU image loading, memory-mapped datasets, and notebook visualization.
development
Guide for creating new Agent Skills with proper structure, frontmatter, bundled assets, and validation. Includes templates, best practices, and examples for building reusable skill resources.
documentation
Comprehensive guide to using Ray for scalable distributed computing, including Ray Core, Data, Train, Tune, Serve, and RLlib with practical examples
tools
Reduce VS Code Python red squiggles to zero in this repository. Use when Pylance or pyright reports errors, when tools/check_syntax.py fails, or when a change should leave VS Code diagnostics clean.