skills/spark-optimization/SKILL.md
Optimize Apache Spark jobs with partitioning, caching, shuffle optimization, and memory tuning. Use when improving Spark performance, debugging slow jobs, or scaling data processing pipelines.
npx skillsauth add ranbot-ai/awesome-skills spark-optimizationInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Production patterns for optimizing Apache Spark jobs including partitioning strategies, memory management, shuffle optimization, and performance tuning.
resources/implementation-playbook.md.Driver Program
↓
Job (triggered by action)
↓
Stages (separated by shuffles)
↓
Tasks (one per partition)
| Factor | Impact | Solution | |--------|--------|----------| | Shuffle | Network I/O, disk I/O | Minimize wide transformations | | Data Skew | Uneven task duration | Salting, broadcast joins | | Serialization | CPU overhead | Use Kryo, columnar formats | | Memory | GC pressure, spills | Tune executor memory | | Partitions | Parallelism | Right-size partitions |
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
# Create optimized Spark session
spark = (SparkSession.builder
.appName("OptimizedJob")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
.config("spark.sql.adaptive.skewJoin.enabled", "true")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.shuffle.partitions", "200")
.getOrCreate())
# Read with optimized settings
df = (spark.read
.format("parquet")
.option("mergeSchema", "false")
.load("s3://bucket/data/"))
# Efficient transformations
result = (df
.filter(F.col("date") >= "2024-01-01")
.select("id", "amount", "category")
.groupBy("category")
.agg(F.sum("amount").alias("total")))
result.write.mode("overwrite").parquet("s3://bucket/output/")
# Calculate optimal partition count
def calculate_partitions(data_size_gb: float, partition_size_mb: int = 128) -> int:
"""
Optimal partition size: 128MB - 256MB
Too few: Under-utilization, memory pressure
Too many: Task scheduling overhead
"""
return max(int(data_size_gb * 1024 / partition_size_mb), 1)
# Repartition for even distribution
df_repartitioned = df.repartition(200, "partition_key")
# Coalesce to reduce partitions (no shuffle)
df_coalesced = df.coalesce(100)
# Partition pruning with predicate pushdown
df = (spark.read.parquet("s3://bucket/data/")
.filter(F.col("date") == "2024-01-01")) # Spark pushes this down
# Write with partitioning for future queries
(df.write
.partitionBy("year", "month", "day")
.mode("overwrite")
.parquet("s3://bucket/partitioned_output/"))
from pyspark.sql import functions as F
from pyspark.sql.types import *
# 1. Broadcast Join - Small table joins
# Best when: One side < 10MB (configurable)
small_df = spark.read.parquet("s3://bucket/small_table/") # < 10MB
large_df = spark.read.parquet("s3://bucket/large_table/") # TBs
# Explicit broadcast hint
result = large_df.join(
F.broadcast(small_df),
on="key",
how="left"
)
# 2. Sort-Merge Join - Default for large tables
# Requires shuffle, but handles any size
result = large_df1.join(large_df2, on="key", how="inner")
# 3. Bucket Join - Pre-sorted, no shuffle at join time
# Write bucketed tables
(df.write
.bucketBy(200, "customer_id")
.sortBy("customer_id")
.mode("overwrite")
.saveAsTable("bucketed_orders"))
# Join bucketed tables (no shuffle!)
orders = spark.table("bucketed_orders")
customers = spark.table("bucketed_customers") # Same bucket count
result = orders.join(customers, on="customer_id")
# 4. Skew Join Handling
# Enable AQE skew join optimization
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")
# Manual salting for severe skew
def salt_join(df_skewed, df_other, key_col, num_salts=10):
"""Add salt to distribute skewed keys"""
# Add salt to skewed side
df_salted = df_skewed.withColumn(
"salt",
(F.rand() * num_salts).cast("int")
).withColumn(
"salted_key",
F.concat(F.col(key_col), F.lit("_"), F.col("salt"))
)
# Explode other side with all salts
df_exploded = df_other.crossJoin(
spark.range(num_s
testing
Fix SEO indexing issues, crawl budget problems, and Search Console coverage errors for Next.js apps. Covers canonical tags, noindex audits, sitemap health, static rendering, and internal linking.
data-ai
Analyze AI disruption pressure across a business, map competitive exposure, and produce a 90-day defensive action plan.
tools
--- name: longbridge description: 125+ agent skills for Longbridge Securities — real-time quotes, charts, fundamentals, portfolio analysis, options, and more for HK/US/A-share/SG markets. Trilingual: Simplified Chinese, Traditional category: AI & Agents source: antigravity tags: [api, mcp, claude, ai, agent, security, cro] url: https://github.com/sickn33/antigravity-awesome-skills/tree/main/skills/longbridge --- # Longbridge ## Overview Longbridge is the official skill collection for Longbr
tools
Design, debug, and harden GitHub Actions CI/CD workflows, including reusable workflows, matrix builds, self-hosted runners, OIDC authentication, caching, environments, secrets, and release automation.