.claude/skills/spark-catalyst/SKILL.md
Expert guidance for working with the Apache Spark Catalyst query optimisation framework. Use this skill when working with Spark SQL internals, creating custom expressions, implementing query optimisations, working with logical/physical plans, or extending Catalyst. Trigger keywords include "catalyst", "spark sql", "expression", "logical plan", "physical plan", "tree node", "query optimisation", "rule executor", "analyzer", "optimizer", "code generation".
npx skillsauth add aehrc/pathling Apache Spark Catalyst APIInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
You are an expert in the Apache Spark Catalyst query optimisation framework. This skill provides comprehensive guidance on using the Catalyst API for query processing, optimisation, and code generation.
Apache Spark Catalyst is a query optimisation framework that powers Spark SQL and DataFrames. It provides:
The Catalyst module is located at sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ and contains:
catalyst/
├── analysis/ # Query analysis and resolution
├── catalog/ # Catalog management and metadata
├── expressions/ # Expression definitions and evaluation
├── optimizer/ # Query optimisation rules
├── parser/ # SQL parsing
├── planning/ # Query planning strategies
├── plans/ # Query plan representations
│ ├── logical/ # Logical plan operators
│ └── physical/ # Physical plan operators
├── rules/ # Rule execution framework
├── trees/ # Tree node infrastructure
├── types/ # Data type utilities
└── util/ # Utility classes
TreeNode is the base class for all tree structures in Catalyst, including expressions and query plans.
abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
// Children of this node.
def children: Seq[BaseType]
// Transform this tree by applying a function to all nodes.
def transform(rule: PartialFunction[BaseType, BaseType]): BaseType
// Transform all nodes bottom-up.
def transformDown(rule: PartialFunction[BaseType, BaseType]): BaseType
// Transform all nodes top-down.
def transformUp(rule: PartialFunction[BaseType, BaseType]): BaseType
// Fast equality check.
def fastEquals(other: TreeNode[_]): Boolean
// Tree pattern matching for efficient traversal.
def treePatternBits: BitSet
}
Bottom-up transformation:
plan.transformUp {
case Filter(condition, child) if isAlwaysTrue(condition) =>
child
}
Top-down transformation:
plan.transformDown {
case Project(projectList, child) =>
// Transform project first, then children
optimiseProject(projectList, child)
}
Collect nodes matching a pattern:
val filters = plan.collect {
case f @ Filter(_, _) => f
}
Expression is the base class for all expression trees in Catalyst.
Literal, AttributeReference)Cast, Not, IsNull)Add, EqualTo, And)If, Substring)abstract class Expression extends TreeNode[Expression] {
// Can this expression be evaluated at query planning time?
def foldable: Boolean
// Does this expression always return the same result for fixed inputs?
def deterministic: Boolean
// Can this expression evaluate to null?
def nullable: Boolean
// Data type of the expression result.
def dataType: DataType
// Evaluate this expression given an input row.
def eval(input: InternalRow): Any
// Generate code for this expression.
def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode
// Attributes referenced by this expression.
def references: AttributeSet
}
Literal values:
Literal(42) // Integer literal
Literal("hello") // String literal
Literal(null, StringType) // Null literal
Attribute references:
AttributeReference("name", StringType, nullable = true)()
AttributeReference("age", IntegerType, nullable = false)()
Predicates:
EqualTo(left, right) // left = right
GreaterThan(left, right) // left > right
LessThanOrEqual(left, right) // left <= right
And(left, right) // left AND right
Or(left, right) // left OR right
Not(child) // NOT child
Arithmetic:
Add(left, right) // left + right
Subtract(left, right) // left - right
Multiply(left, right) // left * right
Divide(left, right) // left / right
String operations:
Substring(str, pos, len) // substring(str, pos, len)
Upper(child) // upper(child)
Lower(child) // lower(child)
Concat(children) // concat(child1, child2, ...)
Type conversion:
Cast(child, targetType) // cast(child as targetType)
QueryPlan is the base class for both logical and physical query plans.
abstract class QueryPlan[PlanType <: QueryPlan[PlanType]]
extends TreeNode[PlanType] {
// Output schema of this plan node.
def output: Seq[Attribute]
// Set of output attributes.
def outputSet: AttributeSet
// Set of attributes from all children.
def inputSet: AttributeSet
// Attributes produced by this node.
def producedAttributes: AttributeSet
// Attributes referenced by expressions.
def references: AttributeSet
// Attributes referenced but not provided by children.
def missingInput: AttributeSet
// All expressions in this plan node.
def expressions: Seq[Expression]
// Transform expressions in this plan.
def transformExpressions(rule: PartialFunction[Expression, Expression]): PlanType
}
Logical plans represent query semantics without execution strategy.
Data sources:
// Read from a relation.
LogicalRelation(relation, output, catalogTable)
// Local in-memory data.
LocalRelation(output, data)
// Empty relation.
EmptyRelation(output)
Projections and filters:
// Project specific columns.
Project(projectList: Seq[NamedExpression], child: LogicalPlan)
// Filter rows.
Filter(condition: Expression, child: LogicalPlan)
// Select distinct rows.
Distinct(child: LogicalPlan)
Aggregations:
// Group by and aggregate.
Aggregate(
groupingExpressions: Seq[Expression],
aggregateExpressions: Seq[NamedExpression],
child: LogicalPlan
)
Joins:
// Join two relations.
Join(
left: LogicalPlan,
right: LogicalPlan,
joinType: JoinType,
condition: Option[Expression],
hint: JoinHint
)
Sorting:
// Sort rows.
Sort(
order: Seq[SortOrder],
global: Boolean,
child: LogicalPlan
)
Limits:
// Limit number of rows.
Limit(limitExpr: Expression, child: LogicalPlan)
Set operations:
// Union of two relations.
Union(children: Seq[LogicalPlan])
// Intersection.
Intersect(left: LogicalPlan, right: LogicalPlan, isAll: Boolean)
// Difference.
Except(left: LogicalPlan, right: LogicalPlan, isAll: Boolean)
InternalRow is the internal representation of a row in Catalyst.
abstract class InternalRow extends SpecializedGetters {
// Number of fields in this row.
def numFields: Int
// Check if field at ordinal is null.
def isNullAt(ordinal: Int): Boolean
// Get value at ordinal.
def get(ordinal: Int, dataType: DataType): Any
// Specialised getters for primitive types.
def getBoolean(ordinal: Int): Boolean
def getByte(ordinal: Int): Byte
def getShort(ordinal: Int): Short
def getInt(ordinal: Int): Int
def getLong(ordinal: Int): Long
def getFloat(ordinal: Int): Float
def getDouble(ordinal: Int): Double
def getDecimal(ordinal: Int, precision: Int, scale: Int): Decimal
def getUTF8String(ordinal: Int): UTF8String
// Update value at ordinal.
def update(ordinal: Int, value: Any): Unit
// Set field to null.
def setNullAt(ordinal: Int): Unit
// Create a copy of this row.
def copy(): InternalRow
// Convert to Scala sequence.
def toSeq(schema: StructType): Seq[Any]
}
Rules define tree transformations, and RuleExecutor applies them in batches.
// Basic rule.
object MyOptimisationRule extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
case Filter(condition, child) if isAlwaysTrue(condition) =>
child
}
}
// Configurable rule.
case class MyParameterisedRule(conf: SQLConf) extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
if (conf.myFeatureEnabled) {
optimisePlan(plan)
} else {
plan
}
}
}
abstract class RuleExecutor[TreeType <: TreeNode[_]] {
// Define batches of rules to execute.
protected def batches: Seq[Batch]
// Execute all batches on the plan.
def execute(plan: TreeType): TreeType
}
// Batch execution strategies.
abstract class Strategy
case class Once extends Strategy
case class FixedPoint(maxIterations: Int) extends Strategy
object MyOptimiser extends RuleExecutor[LogicalPlan] {
val batches = Seq(
Batch("Normalisation", Once,
EliminateSubqueryAliases,
RemoveRedundantAliases
),
Batch("Operator Optimisation", FixedPoint(100),
PushDownPredicate,
ConstantFolding,
ColumnPruning
),
Batch("Join Reordering", Once,
CostBasedJoinReorder
)
)
}
The Analyzer resolves unresolved logical plans by binding attributes, functions, and tables.
val analyzer = new Analyzer(catalogManager)
val analysedPlan = analyzer.execute(unresolvedPlan)
The Optimizer transforms logical plans to improve query performance.
Predicate pushdown:
// Push filters below projections.
PushDownPredicate
// Push filters into join conditions.
PushPredicateThroughJoin
// Push filters to data sources.
PushDownPredicates
Projection pushdown:
// Eliminate unnecessary columns.
ColumnPruning
// Combine adjacent projections.
CollapseProject
Constant folding:
// Evaluate constant expressions.
ConstantFolding
// Simplify expressions.
SimplifyConditionals
SimplifyCasts
Join optimisation:
// Reorder joins for better performance.
CostBasedJoinReorder
// Eliminate redundant joins.
EliminateOuterJoin
Subquery optimisation:
// Decorrelate correlated subqueries.
DecorrelateInnerQuery
// Merge scalar subqueries.
MergeScalarSubqueries
Catalyst generates optimised Java bytecode for query execution.
CodegenContext:
Manages code generation state including variable declarations, functions, and class structure.
ExprCode:
Represents generated code for an expression evaluation.
case class ExprCode(
code: Block, // Generated code block
isNull: ExprValue, // Variable for null check
value: ExprValue // Variable for result value
)
trait CodegenFallback extends Expression {
// Fallback to interpreted evaluation.
protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
ctx.references += this
val objectTerm = ctx.addReferenceObj("expression", this)
ExprCode(
code = code"""
boolean ${ev.isNull} = true;
${CodeGenerator.javaType(dataType)} ${ev.value} =
${CodeGenerator.defaultValue(dataType)};
Object result = $objectTerm.eval(${ctx.INPUT_ROW});
if (result != null) {
${ev.isNull} = false;
${ev.value} = (${CodeGenerator.boxedType(dataType)}) result;
}
""",
isNull = ev.isNull,
value = ev.value
)
}
}
// Generate unsafe projection from expressions.
val projection = GenerateUnsafeProjection.generate(expressions)
val result = projection(inputRow)
// Generate mutable projection.
val mutableProjection = GenerateMutableProjection.generate(expressions)
val outputRow = mutableProjection(inputRow)
// Generate safe projection.
val safeProjection = GenerateSafeProjection.generate(expressions)
// Generate predicate for filter condition.
val predicate = GeneratePredicate.generate(condition)
val passes = predicate.eval(row)
// Generate row comparator.
val ordering = GenerateOrdering.generate(sortOrders)
val compareResult = ordering.compare(row1, row2)
BooleanType
ByteType
ShortType
IntegerType
LongType
FloatType
DoubleType
StringType
BinaryType
DateType
TimestampType
TimestampNTZType
// Array type.
ArrayType(elementType: DataType, containsNull: Boolean)
// Map type.
MapType(keyType: DataType, valueType: DataType, valueContainsNull: Boolean)
// Struct type.
StructType(fields: Seq[StructField])
// Struct field.
StructField(name: String, dataType: DataType, nullable: Boolean, metadata: Metadata)
DecimalType(precision: Int, scale: Int)
DecimalType.SYSTEM_DEFAULT // Decimal(38, 18)
case class MyCustomFunction(child: Expression) extends UnaryExpression {
// Define output type.
override def dataType: DataType = StringType
// Can the result be null?
override def nullable: Boolean = child.nullable
// Evaluate the expression.
override def eval(input: InternalRow): Any = {
val value = child.eval(input)
if (value == null) {
null
} else {
// Custom logic here.
UTF8String.fromString(value.toString.toUpperCase)
}
}
// Generate code for this expression.
override protected def doGenCode(
ctx: CodegenContext,
ev: ExprCode): ExprCode = {
val childGen = child.genCode(ctx)
ev.copy(code = code"""
${childGen.code}
boolean ${ev.isNull} = ${childGen.isNull};
${CodeGenerator.javaType(dataType)} ${ev.value} =
${CodeGenerator.defaultValue(dataType)};
if (!${ev.isNull}) {
${ev.value} = UTF8String.fromString(
${childGen.value}.toString().toUpperCase());
}
""")
}
// Override for pretty printing.
override def prettyName: String = "my_custom_function"
}
object EliminateRedundantCasts extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.transformAllExpressions {
case Cast(child, dataType, _, _) if child.dataType == dataType =>
// Remove cast if types match.
child
}
}
plan match {
case Project(projectList, child) =>
// Handle projection.
case Filter(condition, Project(projectList, child)) =>
// Handle filter over projection.
case Join(left, right, joinType, Some(condition), _) =>
// Handle join with condition.
case Aggregate(grouping, aggregates, child) =>
// Handle aggregation.
case _ =>
// Default case.
}
// Find all attribute references.
val attributes = expr.collect {
case a: AttributeReference => a
}
// Find all subqueries.
val subqueries = expr.collect {
case s: SubqueryExpression => s
}
// Transform specific expression types.
val transformed = expr.transformUp {
case Add(Literal(0, _), right) => right
case Add(left, Literal(0, _)) => left
}
// Resolve attribute by name.
def resolve(attrName: String, input: LogicalPlan): Option[Attribute] = {
input.output.find(_.name == attrName)
}
// Resolve with qualifier.
def resolveQualified(
qualifier: Seq[String],
attrName: String,
input: LogicalPlan): Option[Attribute] = {
input.output.find { attr =>
attr.qualifier.startsWith(qualifier) && attr.name == attrName
}
}
// Create schema from attributes.
val schema = StructType(attributes.map { attr =>
StructField(attr.name, attr.dataType, attr.nullable, attr.metadata)
})
// Convert schema to attributes.
val attributes = schema.toAttributes
// Add column to schema.
val newSchema = schema.add("newColumn", StringType, nullable = true)
// Drop column from schema.
val reducedSchema = StructType(schema.filterNot(_.name == "dropColumn"))
test("my custom function") {
val input = Literal("hello")
val expr = MyCustomFunction(input)
// Test evaluation.
assert(expr.eval(null) == UTF8String.fromString("HELLO"))
// Test properties.
assert(expr.dataType == StringType)
assert(expr.foldable)
assert(expr.deterministic)
}
test("eliminate redundant casts") {
val plan = Project(
Seq(Alias(Cast(AttributeReference("x", IntegerType)(), IntegerType), "y")()),
testRelation
)
val optimised = EliminateRedundantCasts(plan)
val expected = Project(
Seq(Alias(AttributeReference("x", IntegerType)(), "y")()),
testRelation
)
comparePlans(optimised, expected)
}
outputSet and references.fastEquals: For quick equality checks without deep comparison.doGenCode for custom expressions.sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/The Catalyst framework provides:
Key extension points:
Expression)Rule[LogicalPlan])Rule[LogicalPlan])TableProvider)AggregateFunction)When working with Catalyst, focus on immutability, type safety, and efficient tree traversal patterns.
tools
Expert guidance for using WireMock in Java applications for HTTP API mocking and testing. Use this skill when the user asks to mock HTTP APIs, create API stubs, test REST clients, simulate network faults, verify HTTP requests, or integrate WireMock with Spring Boot. Trigger keywords include "wiremock", "mock http", "stub api", "http mock", "api testing", "rest mock", "simulate fault", "verify request", "spring boot wiremock".
documentation
Expert guidance for implementing SQL on FHIR v2 ViewDefinitions and operations to create portable, tabular projections of FHIR data. Use this skill when the user asks to create ViewDefinitions, flatten FHIR resources into tables, write FHIRPath expressions for data extraction, implement forEach/forEachOrNull/repeat patterns for unnesting, create where clauses for filtering, use constants in view definitions, combine data with unionAll, execute ViewDefinitions with $run or $export operations, or implement SQL on FHIR server capabilities. Trigger keywords include "ViewDefinition", "SQL on FHIR", "flatten FHIR", "tabular FHIR", "FHIR to SQL", "FHIR analytics", "FHIRPath columns", "unnest FHIR", "$viewdefinition-run", "$export", "view runner", "repeat", "recursive", "QuestionnaireResponse".
development
Expert guidance for using the SonarCloud API to interact with code quality analysis, projects, issues, quality gates, and metrics. Use this skill when making API calls to SonarCloud, automating code quality workflows, retrieving analysis results, managing projects programmatically, or integrating SonarCloud with CI/CD pipelines. Trigger keywords include "SonarCloud", "SonarCloud API", "code quality API", "SonarQube Cloud", "quality gate", "code analysis API", "SonarCloud measures", "SonarCloud issues".
tools
Expert guidance for implementing SMART App Launch (HL7 FHIR specification for OAuth 2.0-based authorization). Use this skill when implementing FHIR app authorization, EHR launch sequences, standalone app launch, backend services authentication, SMART scopes, token handling, or capability discovery. Trigger keywords include "SMART", "SMART on FHIR", "EHR launch", "standalone launch", "FHIR authorization", "FHIR OAuth", "backend services", "system scopes", "patient scopes", "fhirUser", ".well-known/smart-configuration", "PKCE", "client_credentials", "launch context".