.github/skills/durable-task-python/SKILL.md
Build durable, fault-tolerant workflows in Python using the Durable Task SDK with Azure Durable Task Scheduler. Use when creating orchestrations, activities, entities, or implementing patterns like function chaining, fan-out/fan-in, human interaction, or stateful agents. Applies to any Python application requiring durable execution, state persistence, or distributed transactions without Azure Functions dependency.
npx skillsauth add Azure-Samples/Durable-Task-Scheduler durable-task-pythonInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Build fault-tolerant, stateful workflows in Python applications using the Durable Task SDK connected to Azure Durable Task Scheduler.
pip install durabletask durabletask-azuremanaged azure-identity
Or add to requirements.txt:
durabletask
durabletask-azuremanaged
azure-identity
import os
from azure.identity import DefaultAzureCredential
from durabletask import task
from durabletask.client import OrchestrationStatus
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker
# Activity function
def hello(ctx: task.ActivityContext, name: str) -> str:
return f"Hello {name}!"
# Orchestrator function
def my_orchestration(ctx: task.OrchestrationContext, input: str):
result = yield ctx.call_activity(hello, input=input)
return result
# Configuration - defaults to local emulator
taskhub = os.getenv("TASKHUB", "default")
endpoint = os.getenv("ENDPOINT", "http://localhost:8080")
secure_channel = endpoint != "http://localhost:8080"
credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential()
# Start worker and run orchestration
with DurableTaskSchedulerWorker(
host_address=endpoint,
secure_channel=secure_channel,
taskhub=taskhub,
token_credential=credential
) as worker:
worker.add_orchestrator(my_orchestration)
worker.add_activity(hello)
worker.start()
# Create client and schedule orchestration
dts_client = DurableTaskSchedulerClient(
host_address=endpoint,
secure_channel=secure_channel,
taskhub=taskhub,
token_credential=credential
)
instance_id = dts_client.schedule_new_orchestration(my_orchestration, input="World")
state = dts_client.wait_for_orchestration_completion(instance_id, timeout=60)
if state and state.runtime_status == OrchestrationStatus.COMPLETED:
print(f"Result: {state.serialized_output}")
| Pattern | Use When |
|---------|----------|
| Function Chaining | Sequential steps where each depends on the previous |
| Fan-Out/Fan-In | Parallel processing with aggregated results |
| Human Interaction | Workflow pauses for external input/approval |
| Durable Entities | Stateful objects with operations (counters, accounts) |
| Sub-Orchestrations | Reusable workflow components or version isolation |
| Eternal Orchestrations | Long-running background processes with continue_as_new |
| Monitoring | Periodic polling with configurable timeouts |
See references/patterns.md for detailed implementations.
def my_orchestration(ctx: task.OrchestrationContext, input: str):
"""Orchestrator function - MUST be deterministic"""
# Call activities sequentially
step1 = yield ctx.call_activity(step1_activity, input=input)
step2 = yield ctx.call_activity(step2_activity, input=step1)
return step2
def my_activity(ctx: task.ActivityContext, input: str) -> str:
"""Activity function - can have side effects, I/O, non-determinism"""
# Perform actual work here
print(f"Processing: {input}")
return f"Processed: {input}"
with DurableTaskSchedulerWorker(...) as worker:
worker.add_orchestrator(my_orchestration)
worker.add_activity(step1_activity)
worker.add_activity(step2_activity)
worker.start()
Orchestrations replay from history - all code MUST be deterministic. When an orchestration resumes, it replays all previous code to rebuild state. Non-deterministic code produces different results on replay, causing failures.
NEVER do inside orchestrations:
datetime.now(), datetime.utcnow() → Use ctx.current_utc_datetimeuuid.uuid4() → Use ctx.new_uuid()random.random() → Pass random values from activitiestime.sleep(), asyncio.sleep() → Use ctx.create_timer()ALWAYS use:
yield ctx.call_activity() - Call activitiesyield ctx.call_sub_orchestrator() - Call sub-orchestrationsyield ctx.create_timer() - Durable delaysyield ctx.wait_for_external_event() - Wait for eventsctx.current_utc_datetime - Current timectx.new_uuid() - Generate GUIDsctx.set_custom_status() - Set status# WRONG - datetime.now() returns different value on replay
def bad_orchestration(ctx: task.OrchestrationContext, _):
current_time = datetime.now() # Non-deterministic!
if current_time.hour < 12:
yield ctx.call_activity(morning_activity)
# CORRECT - ctx.current_utc_datetime is replayed consistently
def good_orchestration(ctx: task.OrchestrationContext, _):
current_time = ctx.current_utc_datetime # Deterministic
if current_time.hour < 12:
yield ctx.call_activity(morning_activity)
# WRONG - uuid4() generates different value on replay
def bad_orchestration(ctx: task.OrchestrationContext, _):
order_id = str(uuid.uuid4()) # Non-deterministic!
yield ctx.call_activity(create_order, input=order_id)
# CORRECT - ctx.new_uuid() replays the same value
def good_orchestration(ctx: task.OrchestrationContext, _):
order_id = str(ctx.new_uuid()) # Deterministic
yield ctx.call_activity(create_order, input=order_id)
# WRONG - random produces different values on replay
def bad_orchestration(ctx: task.OrchestrationContext, _):
delay = random.randint(1, 10) # Non-deterministic!
yield ctx.create_timer(timedelta(seconds=delay))
# CORRECT - generate random in activity, pass to orchestrator
def get_random_delay(ctx: task.ActivityContext, _) -> int:
return random.randint(1, 10) # OK in activity
def good_orchestration(ctx: task.OrchestrationContext, _):
delay = yield ctx.call_activity(get_random_delay) # Deterministic
yield ctx.create_timer(timedelta(seconds=delay))
# WRONG - time.sleep blocks and doesn't persist
def bad_orchestration(ctx: task.OrchestrationContext, _):
yield ctx.call_activity(step1)
time.sleep(60) # Non-durable! Lost on restart
yield ctx.call_activity(step2)
# CORRECT - ctx.create_timer is durable
def good_orchestration(ctx: task.OrchestrationContext, _):
yield ctx.call_activity(step1)
yield ctx.create_timer(timedelta(seconds=60)) # Durable timer
yield ctx.call_activity(step2)
# WRONG - HTTP call in orchestrator is non-deterministic
def bad_orchestration(ctx: task.OrchestrationContext, url: str):
import requests
response = requests.get(url) # Non-deterministic!
return response.json()
# CORRECT - move I/O to activity
def fetch_data(ctx: task.ActivityContext, url: str) -> dict:
import requests
response = requests.get(url) # OK in activity
return response.json()
def good_orchestration(ctx: task.OrchestrationContext, url: str):
data = yield ctx.call_activity(fetch_data, input=url) # Deterministic
return data
# WRONG - database query in orchestrator
def bad_orchestration(ctx: task.OrchestrationContext, user_id: str):
import sqlite3
conn = sqlite3.connect('db.sqlite') # Non-deterministic!
cursor = conn.execute("SELECT * FROM users WHERE id=?", (user_id,))
user = cursor.fetchone()
# ...
# CORRECT - database access in activity
def get_user(ctx: task.ActivityContext, user_id: str) -> dict:
import sqlite3
conn = sqlite3.connect('db.sqlite') # OK in activity
cursor = conn.execute("SELECT * FROM users WHERE id=?", (user_id,))
return dict(cursor.fetchone())
def good_orchestration(ctx: task.OrchestrationContext, user_id: str):
user = yield ctx.call_activity(get_user, input=user_id)
# ...
# WRONG - env var might change between replays
def bad_orchestration(ctx: task.OrchestrationContext, _):
api_endpoint = os.getenv("API_ENDPOINT") # Could change!
yield ctx.call_activity(call_api, input=api_endpoint)
# CORRECT - pass config as input or read in activity
def good_orchestration(ctx: task.OrchestrationContext, config: dict):
api_endpoint = config["api_endpoint"] # From input, deterministic
yield ctx.call_activity(call_api, input=api_endpoint)
# ALSO CORRECT - read env var in activity
def call_api(ctx: task.ActivityContext, _) -> str:
api_endpoint = os.getenv("API_ENDPOINT") # OK in activity
# make the call...
# WRONG - file existence can change between replays
def bad_orchestration(ctx: task.OrchestrationContext, path: str):
if os.path.exists(path): # Non-deterministic!
yield ctx.call_activity(process_file, input=path)
# CORRECT - check in activity
def check_file_exists(ctx: task.ActivityContext, path: str) -> bool:
return os.path.exists(path) # OK in activity
def good_orchestration(ctx: task.OrchestrationContext, path: str):
exists = yield ctx.call_activity(check_file_exists, input=path)
if exists: # Deterministic - based on activity result
yield ctx.call_activity(process_file, input=path)
# POTENTIALLY WRONG - dict iteration order may vary (Python < 3.7)
def risky_orchestration(ctx: task.OrchestrationContext, items: dict):
for key in items: # Order might not be guaranteed
yield ctx.call_activity(process, input=key)
# CORRECT - use sorted keys for deterministic order
def good_orchestration(ctx: task.OrchestrationContext, items: dict):
for key in sorted(items.keys()): # Guaranteed order
yield ctx.call_activity(process, input=key)
# WRONG - global state can change
counter = 0
def bad_orchestration(ctx: task.OrchestrationContext, _):
global counter
counter += 1 # Non-deterministic across replays!
yield ctx.call_activity(process, input=counter)
# CORRECT - pass state through orchestration input/output
def good_orchestration(ctx: task.OrchestrationContext, counter: int):
counter += 1 # Local variable, deterministic
yield ctx.call_activity(process, input=counter)
# If continuing, pass counter forward
ctx.continue_as_new(counter)
In Python, orchestrator functions use yield to await durable operations:
# CORRECT - use yield
result = yield ctx.call_activity(my_activity, input="data")
# WRONG - will not work
result = ctx.call_activity(my_activity, input="data") # Missing yield!
def orchestrator_with_error_handling(ctx: task.OrchestrationContext, input: str):
try:
result = yield ctx.call_activity(risky_activity, input=input)
return result
except task.TaskFailedError as e:
# Activity failed - implement compensation
ctx.set_custom_status({"error": str(e)})
yield ctx.call_activity(compensation_activity, input=input)
return "Compensated"
from durabletask.task import RetryPolicy
retry_policy = RetryPolicy(
first_retry_interval=5, # seconds
max_number_of_attempts=3,
backoff_coefficient=2.0,
max_retry_interval=60, # seconds
retry_timeout=300 # seconds
)
def orchestrator(ctx: task.OrchestrationContext, _):
result = yield ctx.call_activity(
unreliable_activity,
input="data",
retry_policy=retry_policy
)
return result
The SDK supports dataclasses, namedtuples, and custom classes:
from dataclasses import dataclass
@dataclass
class Order:
product: str
quantity: int
cost: float
def process_order(ctx: task.ActivityContext, order: Order) -> str:
return f"Processed {order.quantity}x {order.product}"
def order_workflow(ctx: task.OrchestrationContext, order: Order):
result = yield ctx.call_activity(process_order, input=order)
return result
# No authentication required
taskhub = "default"
endpoint = "http://localhost:8080"
credential = None
secure_channel = False
from azure.identity import DefaultAzureCredential
taskhub = "my-taskhub"
endpoint = "https://my-scheduler.region.durabletask.io"
credential = DefaultAzureCredential()
secure_channel = True
def get_connection_config():
endpoint = os.getenv("ENDPOINT", "http://localhost:8080")
taskhub = os.getenv("TASKHUB", "default")
is_local = endpoint == "http://localhost:8080"
return {
"host_address": endpoint,
"taskhub": taskhub,
"secure_channel": not is_local,
"token_credential": None if is_local else DefaultAzureCredential()
}
config = get_connection_config()
worker = DurableTaskSchedulerWorker(**config)
client = DurableTaskSchedulerClient(**config)
# Pull and run the emulator
docker pull mcr.microsoft.com/dts/dts-emulator:latest
docker run -d -p 8080:8080 -p 8082:8082 --name dts-emulator mcr.microsoft.com/dts/dts-emulator:latest
# Dashboard available at http://localhost:8082
# Schedule new orchestration
instance_id = client.schedule_new_orchestration(my_orchestration, input="data")
# Schedule with custom instance ID
instance_id = client.schedule_new_orchestration(
my_orchestration,
input="data",
instance_id="my-custom-id"
)
# Wait for completion
state = client.wait_for_orchestration_completion(instance_id, timeout=60)
# Get current status
state = client.get_orchestration_state(instance_id)
# Raise external event
client.raise_orchestration_event(instance_id, "approval_received", data=approval_data)
# Terminate orchestration
client.terminate_orchestration(instance_id, output="User cancelled")
# Suspend/Resume
client.suspend_orchestration(instance_id)
client.resume_orchestration(instance_id)
development
Migrate existing Azure Durable Functions apps from existing backend storage providers (Azure Storage, Netherite, MSSQL) to the Durable Task Scheduler. Use when switching backends, converting to azureManaged storage provider, upgrading from Azure Storage default provider, migrating from Netherite Event Hubs-based backend, migrating from Microsoft SQL Server backend, or modernizing Durable Functions infrastructure. Applies to .NET, Python, JavaScript/TypeScript, and Java Durable Functions apps that need to adopt the managed Durable Task Scheduler service.
development
Build durable, fault-tolerant workflows in JavaScript/TypeScript using the Durable Task SDK with Azure Durable Task Scheduler. Use when creating orchestrations, activities, entities, or implementing patterns like function chaining, fan-out/fan-in, human interaction, or durable timers. Applies to any Node.js application requiring durable execution, state persistence, or distributed coordination without Azure Functions dependency.
development
Build durable, fault-tolerant workflows in Java using the Durable Task SDK with Azure Durable Task Scheduler. Use when creating orchestrations, activities, or implementing patterns like function chaining, fan-out/fan-in, human interaction, or monitoring. Applies to any Java application requiring durable execution, state persistence, or distributed transactions without Azure Functions dependency.
development
Build durable, fault-tolerant workflows in .NET using the Durable Task SDK with Azure Durable Task Scheduler. Use when creating orchestrations, activities, entities, or implementing patterns like function chaining, fan-out/fan-in, human interaction, or stateful agents. Applies to any .NET application requiring durable execution, state persistence, or distributed transactions without Azure Functions dependency.