.claude/skills/pathling-python/SKILL.md
Comprehensive cheat sheet for using the Pathling Python API. Use this skill when working with FHIR data in Python, running SQL on FHIR queries, using terminology functions, encoding FHIR resources, or any other Pathling Python operations. Trigger keywords include "pathling", "pathling python", "fhir encoding", "sql on fhir python", "terminology functions", "member_of", "translate", "subsumes", "PathlingContext".
npx skillsauth add aehrc/pathling pathling-pythonInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
You are an expert in using the Pathling Python API for working with FHIR data in Python applications and data science workflows.
Prerequisites:
pip install pathling
The main entry point for all Pathling operations. It manages the Spark session and provides access to data reading, encoding, and terminology functions.
Creating a basic context:
from pathling import PathlingContext
pc = PathlingContext.create()
Creating a context with terminology server authentication:
pc = PathlingContext.create(
terminology_server_url='https://ontology.nhs.uk/production1/fhir',
token_endpoint='https://ontology.nhs.uk/authorisation/auth/realms/nhs-digital-terminology/protocol/openid-connect/token',
client_id='[client ID]',
client_secret='[client secret]'
)
Creating a context with caching:
pc = PathlingContext.create(
terminology_server_url="http://localhost:8081/fhir",
terminology_verbose_request_logging=True,
cache_override_expiry=2_628_000,
cache_storage_type="disk",
cache_storage_path=".local/tx-cache"
)
Accessing the Spark session:
pc.spark # Access the underlying Spark session
pc.spark.sparkContext.setLogLevel("DEBUG") # Set log level
NDJSON is a common format for bulk FHIR data, with one JSON resource per line.
# Read each line from the NDJSON into a row within a Spark data set.
ndjson_dir = '/some/path/ndjson/'
json_resources = pc.spark.read.text(ndjson_dir)
# Convert the data set of strings into a structured FHIR data set.
patients = pc.encode(json_resources, 'Patient')
# Do some stuff.
patients.select('id', 'gender', 'birthDate').show()
Using the DataSource API:
data = pc.read.ndjson("/some/file/location")
FHIR Bundles contain collections of related resources.
# Read each Bundle into a row within a Spark data set.
bundles_dir = '/some/path/bundles/'
bundles = pc.spark.read.text(bundles_dir, wholetext=True)
# Convert the data set of strings into a structured FHIR data set.
patients = pc.encode_bundle(bundles, 'Patient')
# XML Bundles can be encoded using input type.
# patients = pc.encode_bundle(bundles, 'Patient', inputType=MimeType.FHIR_XML)
# Do some stuff.
patients.select('id', 'gender', 'birthDate').show()
# Read from previously persisted Delta tables.
data = pc.read.tables()
# Get available resource types.
data.resource_types()
# Read a specific resource.
patients = data.read('Patient')
patients.count()
SQL on FHIR views project FHIR data into easy-to-use tabular forms.
result = data.view(
resource="Patient",
select=[
{"column": [{"path": "getResourceKey()", "name": "patient_id"}]},
{
"forEach": "address",
"column": [
{"path": "line.join('\\n')", "name": "street"},
{"path": "use", "name": "use"},
{"path": "city", "name": "city"},
{"path": "postalCode", "name": "zip"},
],
},
],
)
display(result)
view_ds = datasource.view(
resource='Patient',
select=[
{
'column': [
{'path': 'id', 'name': 'id'},
{'path': 'gender', 'name': 'gender'},
{'path': "telecom.where(system='phone').value ", 'name': 'phone_numbers', 'collection': True},
]
}
],
where=[
{'path': "gender = 'male'"},
]
)
view_ds.show()
view_ds = datasource.view(
resource='Patient',
select=[
{
'forEach': 'name',
'column': [
{'path': 'use', 'name': 'name_use'},
{'path': 'family', 'name': 'family_name'},
],
'select': [
{
'forEachOrNull': 'given',
'column': [
{'path': '$this', 'name': 'given_name'},
],
}
]
},
]
)
view_ds.show()
Terminology functions require a FHIR terminology server to be configured.
to_coding:
from pathling import to_coding
# Convert a column containing codes into a Coding struct.
coding_column = to_coding(df.CODE, 'http://snomed.info/sct')
# With version.
coding_column = to_coding(df.CODE, 'http://snomed.info/sct', version='http://snomed.info/sct/32506021000036107/version/20250831')
to_snomed_coding:
from pathling import to_snomed_coding
# Shorthand for SNOMED CT codes.
coding_column = to_snomed_coding(df.CODE)
coding_column = to_snomed_coding(df.CODE, version='http://snomed.info/sct/32506021000036107/version/20250831')
to_loinc_coding:
from pathling.functions import to_loinc_coding
# Shorthand for LOINC codes.
coding_column = to_loinc_coding(df.CODE)
Coding class:
from pathling import Coding
# Create a Coding object.
coding = Coding('http://snomed.info/sct', '232208008')
# Create a SNOMED Coding using the factory method.
snomed_coding = Coding.of_snomed('232208008')
# Convert to a Spark literal column.
coding_literal = coding.to_literal()
to_ecl_value_set:
from pathling import to_ecl_value_set
# Convert a SNOMED CT ECL expression into a FHIR ValueSet URI.
viral_infection_ecl = """
<< 64572001|Disease| : (
<< 370135005|Pathological process| = << 441862004|Infectious process|,
<< 246075003|Causative agent| = << 49872002|Virus|
)
"""
value_set_uri = to_ecl_value_set(viral_infection_ecl)
Test if a code is a member of a value set.
from pathling import member_of, to_snomed_coding, to_ecl_value_set
# Using an ECL expression.
result = csv.select(
"CODE",
"DESCRIPTION",
member_of(
to_snomed_coding(csv.CODE),
to_ecl_value_set("<< 64572001") # Viral disease
).alias("VIRAL_INFECTION")
)
result.show()
# Using a predefined value set.
result = csv.select(
"CODE",
"DESCRIPTION",
member_of(
to_coding(csv.CODE, 'http://loinc.org'),
'http://hl7.org/fhir/ValueSet/observation-vitalsignresult'
).alias("IS_VITAL_SIGN")
)
Alternative syntax using PathlingContext:
result = transformed_df.withColumn(
"Viral Infection",
pc.snomed.member_of(col("primary_diagnosis_concept"), "<< 64572001")
)
Translate codes from one code system to another.
from pathling import translate, to_coding
# Translate SNOMED CT codes to Read CTV3.
result = pc.translate(
csv,
to_coding(csv.CODE, 'http://snomed.info/sct'),
'http://snomed.info/sct/900000000000207008?fhir_cm=900000000000497000',
output_column_name='READ_CODE'
)
# Extract just the code from the Coding struct.
result = result.withColumn('READ_CODE', result.READ_CODE.code)
result.select('CODE', 'DESCRIPTION', 'READ_CODE').show()
Test if one code is equal to or a subtype of another code.
from pathling import subsumes, to_snomed_coding, Coding
# Test if codes are subsumed by a specific code.
# 232208008 |Ear, nose and throat disorder|
left_coding = Coding('http://snomed.info/sct', '232208008')
right_coding_column = to_snomed_coding(csv.CODE)
result = pc.subsumes(
csv,
'IS_ENT',
left_coding=left_coding,
right_coding_column=right_coding_column
)
result.select('CODE', 'DESCRIPTION', 'IS_ENT').show()
Using subsumed_by (reverse order):
from pathling import subsumed_by
# Test if a code is subsumed by codes in a column.
result = pc.subsumed_by(
csv,
'IS_SUBTYPE',
left_coding_column=to_snomed_coding(csv.CODE),
right_coding=Coding('http://snomed.info/sct', '232208008')
)
Retrieve properties associated with codes in terminologies.
from pathling import property_of, to_snomed_coding, PropertyType
# Get the parent codes for each code in the dataset.
parents = csv.withColumn(
"PARENTS",
property_of(to_snomed_coding(csv.CODE), "parent", PropertyType.CODE)
)
# Split each parent code into a separate row.
exploded_parents = parents.selectExpr(
"CODE", "DESCRIPTION", "explode_outer(PARENTS) AS PARENT"
)
PropertyType values:
PropertyType.CODE - Returns an array of codes.PropertyType.STRING - Returns an array of strings.PropertyType.INTEGER - Returns an array of integers.PropertyType.BOOLEAN - Returns an array of booleans.PropertyType.DATETIME - Returns an array of timestamps.PropertyType.DECIMAL - Returns an array of decimals.Retrieve the preferred display term for codes.
from pathling import display, to_snomed_coding
# Get the display term for parent codes.
with_displays = exploded_parents.withColumn(
"PARENT_DISPLAY",
display(to_snomed_coding(exploded_parents.PARENT))
)
with_displays.show()
Alternative syntax using PathlingContext:
transformed_df = source_df.withColumn(
"Primary Diagnosis Term",
pc.snomed.display(col("primary_diagnosis_concept"))
)
Retrieve alternative display terms (synonyms, translations, etc.).
from pathling import designation, to_snomed_coding, Coding
# Get the synonyms for each code in the dataset.
# 900000000000013009 is the SNOMED CT "Synonym" designation use.
synonyms = csv.withColumn(
"SYNONYMS",
designation(
to_snomed_coding(csv.CODE),
Coding.of_snomed("900000000000013009")
)
)
# Split each synonym into a separate row.
exploded_synonyms = synonyms.selectExpr(
"CODE", "DESCRIPTION", "explode_outer(SYNONYMS) AS SYNONYM"
)
exploded_synonyms.show()
from pathling import PathlingContext, to_snomed_coding, to_ecl_value_set, member_of
from pyspark.sql.functions import col, when
pc = PathlingContext.create()
# Define value sets using ECL.
viral_infection_ecl = "<< 64572001" # Viral disease
musculoskeletal_injury_ecl = "<< 263534002" # Injury of musculoskeletal system
mental_health_ecl = "<< 40733004 |Mental state finding|"
# Add membership columns for each category.
categorised_df = df.withColumn(
"Viral Infection",
member_of(to_snomed_coding(col("diagnosis_code")), to_ecl_value_set(viral_infection_ecl))
).withColumn(
"Musculoskeletal Injury",
member_of(to_snomed_coding(col("diagnosis_code")), to_ecl_value_set(musculoskeletal_injury_ecl))
).withColumn(
"Mental Health Problem",
member_of(to_snomed_coding(col("diagnosis_code")), to_ecl_value_set(mental_health_ecl))
)
# Create mutually exclusive categories with hierarchy.
mutually_exclusive_df = categorised_df.withColumn(
"Category",
when(col("Viral Infection"), "Viral Infection")
.when(~col("Viral Infection") & col("Musculoskeletal Injury"), "Musculoskeletal Injury")
.when(~col("Viral Infection") & ~col("Musculoskeletal Injury") & col("Mental Health Problem"), "Mental Health Problem")
.otherwise("Other")
)
from pathling import display, property_of, to_snomed_coding, PropertyType
# Add display terms to codes.
enriched_df = df.withColumn(
"diagnosis_display",
display(to_snomed_coding(df.diagnosis_code))
)
# Add parent codes.
with_parents = enriched_df.withColumn(
"parent_codes",
property_of(to_snomed_coding(df.diagnosis_code), "parent", PropertyType.CODE)
)
After creating a view or running terminology functions, you can convert the result to a Pandas DataFrame for use in Python data science tools.
# Convert to Pandas.
pandas_df = result.toPandas()
# Use with plotting libraries.
import plotly.express as px
fig = px.bar(pandas_df, x="category", y="count")
fig.show()
terminology_server_url - URL of the FHIR terminology server.token_endpoint - OAuth2 token endpoint for authentication.client_id - OAuth2 client ID.client_secret - OAuth2 client secret.terminology_verbose_request_logging - Enable verbose logging of terminology requests.cache_override_expiry - Cache expiry time in seconds.cache_storage_type - Cache storage type ("memory" or "disk").cache_storage_path - Path for disk-based cache.When running your own Spark cluster, configure Pathling as a Spark package:
spark.jars.packages au.csiro.pathling:library-api:[version]
from pathling import MimeType, Version
# MimeType values.
MimeType.FHIR_JSON # application/fhir+json
MimeType.FHIR_XML # application/fhir+xml
# Version values.
Version.R4 # FHIR R4
Install both packages:
pathling PyPI packageau.csiro.pathling:library-api Maven packageEnable Java 21 support in Advanced Options > Spark > Environment Variables:
JNAME=zulu21-ca-amd64
Always create Coding structs when using terminology functions - Use to_coding(), to_snomed_coding(), or the Coding class to convert code columns into the proper struct format.
ECL expressions need to be converted to value set URIs - Use to_ecl_value_set() to convert ECL expressions before passing them to member_of().
Terminology functions return new columns - Use .withColumn() or .select() to add terminology function results to your DataFrame.
Resource encoding requires the correct resource type - Make sure to specify the correct FHIR resource type when using encode() or encode_bundle().
SQL on FHIR views use FHIRPath syntax - The path elements in view definitions use FHIRPath expressions, not SQL or Python syntax.
PathlingContext manages the Spark session - Access the Spark session via pc.spark, don't create a separate one.
Use DataSource API for reading data - Prefer pc.read.ndjson() or pc.read.tables() over manual encoding.
Cache terminology results - Configure terminology caching to avoid repeated requests to the terminology server.
Use appropriate terminology server - For Australian FHIR content, use the Australian terminology server.
Batch terminology operations - Process data in batches to improve performance of terminology operations.
Use SQL on FHIR views for complex projections - Views provide a declarative way to flatten and transform FHIR data.
Profile your Spark jobs - Use Spark's monitoring tools to identify performance bottlenecks.
Set appropriate log levels - Use pc.spark.sparkContext.setLogLevel() to control logging verbosity.
tools
Expert guidance for using WireMock in Java applications for HTTP API mocking and testing. Use this skill when the user asks to mock HTTP APIs, create API stubs, test REST clients, simulate network faults, verify HTTP requests, or integrate WireMock with Spring Boot. Trigger keywords include "wiremock", "mock http", "stub api", "http mock", "api testing", "rest mock", "simulate fault", "verify request", "spring boot wiremock".
documentation
Expert guidance for implementing SQL on FHIR v2 ViewDefinitions and operations to create portable, tabular projections of FHIR data. Use this skill when the user asks to create ViewDefinitions, flatten FHIR resources into tables, write FHIRPath expressions for data extraction, implement forEach/forEachOrNull/repeat patterns for unnesting, create where clauses for filtering, use constants in view definitions, combine data with unionAll, execute ViewDefinitions with $run or $export operations, or implement SQL on FHIR server capabilities. Trigger keywords include "ViewDefinition", "SQL on FHIR", "flatten FHIR", "tabular FHIR", "FHIR to SQL", "FHIR analytics", "FHIRPath columns", "unnest FHIR", "$viewdefinition-run", "$export", "view runner", "repeat", "recursive", "QuestionnaireResponse".
development
Expert guidance for working with the Apache Spark Catalyst query optimisation framework. Use this skill when working with Spark SQL internals, creating custom expressions, implementing query optimisations, working with logical/physical plans, or extending Catalyst. Trigger keywords include "catalyst", "spark sql", "expression", "logical plan", "physical plan", "tree node", "query optimisation", "rule executor", "analyzer", "optimizer", "code generation".
development
Expert guidance for using the SonarCloud API to interact with code quality analysis, projects, issues, quality gates, and metrics. Use this skill when making API calls to SonarCloud, automating code quality workflows, retrieving analysis results, managing projects programmatically, or integrating SonarCloud with CI/CD pipelines. Trigger keywords include "SonarCloud", "SonarCloud API", "code quality API", "SonarQube Cloud", "quality gate", "code analysis API", "SonarCloud measures", "SonarCloud issues".