src/PowerPlatform/Dataverse/claude_skill/dataverse-sdk-use/SKILL.md
Guidance for using the PowerPlatform Dataverse Client Python SDK. Use when calling the SDK like creating CRUD operations, SQL queries, table metadata management, relationships, and upload files.
npx skillsauth add microsoft/powerplatform-dataverseclient-python dataverse-sdk-useInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Use the PowerPlatform Dataverse Client Python SDK to interact with Microsoft Dataverse.
"account", "contact")"new_Product", "cr123_Invoice")"new_Price", "cr123_Status")client.records -- CRUD and OData queriesclient.query -- query and search operationsclient.tables -- table metadata, columns, and relationshipsclient.files -- file upload operationsclient.batch -- batch multiple operations into a single HTTP requestThe SDK supports Dataverse's native bulk operations: Pass lists to create(), update() for automatic bulk processing, for delete(), set use_bulk_delete when passing lists to use bulk operation
page_size parameter on records.list(), records.list_pages(), or QueryBuilder.page_size()top parameter to limit total records returnedclient.query.builder(table)....execute_pages() — composable where(col(...)) filters, formatted values, expand with nested selects, full pagination controlrecords.list_pages(table, *, filter, select, top, orderby, expand, page_size, count, include_annotations) — string-based OData filter only, yields one QueryResult per pageexecute(by_page=True/False) is deprecated and emits UserWarning; use execute_pages() insteadQueryBuilder.to_dataframe() is deprecated; use .execute().to_dataframe() insteadrecords.list(), records.retrieve(), execute(), and each page from list_pages() / execute_pages()for record in result — each item is a dict-like Record.to_dataframe() — convert to pandas DataFrame.first() — return the first record or None (safe: returns None on empty result)result[n] — index access returns a Record; result[n:m] returns a QueryResultlen(result) — number of records in this result/pageclient.dataframe namespace: client.dataframe.create(), client.dataframe.update(), client.dataframe.delete() — client.dataframe.get() is deprecated; use client.query.builder(table).where(...).execute().to_dataframe() insteadfrom azure.identity import (
InteractiveBrowserCredential,
ClientSecretCredential,
CertificateCredential,
AzureCliCredential
)
from PowerPlatform.Dataverse.client import DataverseClient
# Development options
credential = InteractiveBrowserCredential()
credential = AzureCliCredential()
# Production options
credential = ClientSecretCredential(tenant_id, client_id, client_secret)
credential = CertificateCredential(tenant_id, client_id, cert_path)
# Create client with context manager (recommended -- enables HTTP connection pooling)
# No trailing slash on URL!
with DataverseClient("https://yourorg.crm.dynamics.com", credential) as client:
... # all operations here
# Session closed, caches cleared automatically
# Or without context manager:
client = DataverseClient("https://yourorg.crm.dynamics.com", credential)
# Single record
account_id = client.records.create("account", {"name": "Contoso Ltd", "telephone1": "555-0100"})
# Bulk create (uses CreateMultiple API automatically)
contacts = [
{"firstname": "John", "lastname": "Doe"},
{"firstname": "Jane", "lastname": "Smith"}
]
contact_ids = client.records.create("contact", contacts)
# Get single record by ID
account = client.records.retrieve("account", account_id, select=["name", "telephone1"])
# With expand — fetch a related record in the same HTTP request
account = client.records.retrieve(
"account", account_id,
select=["name"],
expand=["primarycontactid"],
)
contact = (account.get("primarycontactid") or {})
print(contact.get("fullname"))
# Simple shortcut — use records.list() only for basic filter + select without composable logic.
# Follows @odata.nextLink automatically and loads all matching records into memory.
# For filtering, sorting, expansion, or formatted values, prefer client.query.builder() (see below).
result = client.records.list("account", filter="statecode eq 0", select=["name", "accountid"])
for record in result:
print(record["name"])
Use client.query.builder() for any query that goes beyond simple filter + select. It provides composable where(col(...)) expressions, formatted value support, nested expansion, and streaming — all with a fluent API.
from PowerPlatform.Dataverse.models.filters import col
from PowerPlatform.Dataverse.models.query_builder import ExpandOption
# Basic query with composable filter and sort
result = (client.query.builder("account")
.select("accountid", "name", "statecode")
.where(col("statecode") == 0)
.order_by("name asc")
.execute())
for record in result:
print(record["name"])
# Composable filters — AND / OR / NOT using Python operators
result = (client.query.builder("contact")
.select("fullname", "emailaddress1")
.where((col("statecode") == 0) & (col("emailaddress1").contains("@contoso.com")))
.execute())
# Formatted values — display labels for option sets, currency symbols, etc.
result = (client.query.builder("account")
.select("accountid", "name", "industrycode")
.where(col("statecode") == 0)
.include_formatted_values()
.execute())
for record in result:
label = record.get("[email protected]")
print(record["name"], label)
# Navigation property expansion with nested column select
result = (client.query.builder("account")
.select("name")
.expand(ExpandOption("primarycontactid").select("fullname", "emailaddress1"))
.where(col("statecode") == 0)
.execute())
for record in result:
contact = record.get("primarycontactid", {})
print(f"{record['name']} - {contact.get('fullname', 'N/A')}")
# Stream large result sets page-by-page (memory-efficient)
for page in (client.query.builder("account")
.select("accountid", "name")
.where(col("statecode") == 0)
.order_by("name asc")
.page_size(500)
.execute_pages()):
for record in page:
print(record["name"])
# Convert query results to a DataFrame
df = (client.query.builder("account")
.select("accountid", "name")
.where(col("statecode") == 0)
.execute()
.to_dataframe())
# Limit total results
result = client.query.builder("account").select("name").top(100).execute()
# Simple streaming shortcut via records.list_pages() (string filter only, same params as records.list())
for page in client.records.list_pages("account", filter="statecode eq 0", select=["name"], page_size=500):
for record in page:
print(record["name"])
# Set lookup fields using @odata.bind with PascalCase navigation property names
# CORRECT: use the navigation property name (case-sensitive, must match $metadata)
guid = client.records.create("new_ticket", {
"new_name": "TKT-001",
"[email protected]": f"/new_customers({customer_id})",
"[email protected]": f"/new_agents({agent_id})",
})
# WRONG: lowercase navigation property causes 400 error
# "[email protected]" -> ODataException: undeclared property 'new_customerid'
# Single update
client.records.update("account", account_id, {"telephone1": "555-0200"})
# Bulk update (broadcast same change to multiple records)
client.records.update("account", [id1, id2, id3], {"industry": "Technology"})
Creates or updates records identified by alternate keys. Single item -> PATCH; multiple items -> UpsertMultiple bulk action.
Prerequisite: The table must have an alternate key configured in Dataverse for the columns used in
alternate_key. Without it, Dataverse will reject the request with a 400 error.
from PowerPlatform.Dataverse.models import UpsertItem
# Single upsert
client.records.upsert("account", [
UpsertItem(
alternate_key={"accountnumber": "ACC-001"},
record={"name": "Contoso Ltd", "telephone1": "555-0100"},
)
])
# Bulk upsert (uses UpsertMultiple API automatically)
client.records.upsert("account", [
UpsertItem(alternate_key={"accountnumber": "ACC-001"}, record={"name": "Contoso Ltd"}),
UpsertItem(alternate_key={"accountnumber": "ACC-002"}, record={"name": "Fabrikam Inc"}),
])
# Composite alternate key
client.records.upsert("account", [
UpsertItem(
alternate_key={"accountnumber": "ACC-001", "address1_postalcode": "98052"},
record={"name": "Contoso Ltd"},
)
])
# Plain dict syntax (no import needed)
client.records.upsert("account", [
{"alternate_key": {"accountnumber": "ACC-001"}, "record": {"name": "Contoso Ltd"}}
])
# Single delete
client.records.delete("account", account_id)
# Bulk delete (uses BulkDelete API)
client.records.delete("account", [id1, id2, id3], use_bulk_delete=True)
The SDK provides DataFrame wrappers for all CRUD operations via the client.dataframe namespace, using pandas DataFrames and Series as input/output.
Note:
client.dataframe.get()is deprecated. Useclient.query.builder(table).select(...).where(...).execute().to_dataframe()instead.QueryBuilder.to_dataframe()(without.execute()) is also deprecated — always call.execute()first.
import pandas as pd
# Query records -- returns a single DataFrame (GA pattern: .execute().to_dataframe())
from PowerPlatform.Dataverse.models.filters import col
df = client.query.builder("account").where(col("statecode") == 0).select("name").execute().to_dataframe()
print(f"Got {len(df)} rows")
# Limit results with top
df = client.query.builder("account").select("name").top(100).execute().to_dataframe()
# Via records.list() (simpler for basic queries)
df = client.records.list("account", filter="statecode eq 0", select=["name"]).to_dataframe()
# Create records from a DataFrame (returns a Series of GUIDs)
new_accounts = pd.DataFrame([
{"name": "Contoso", "telephone1": "555-0100"},
{"name": "Fabrikam", "telephone1": "555-0200"},
])
new_accounts["accountid"] = client.dataframe.create("account", new_accounts)
# Update records from a DataFrame (id_column identifies the GUID column)
new_accounts["telephone1"] = ["555-0199", "555-0299"]
client.dataframe.update("account", new_accounts, id_column="accountid")
# Clear a field by setting clear_nulls=True (by default, NaN/None fields are skipped)
df = pd.DataFrame([{"accountid": "guid-1", "websiteurl": None}])
client.dataframe.update("account", df, id_column="accountid", clear_nulls=True)
# Delete records by passing a Series of GUIDs
client.dataframe.delete("account", new_accounts["accountid"])
SQL queries are read-only and support limited SQL syntax. A single SELECT statement with optional WHERE, TOP (integer literal), ORDER BY (column names only), and a simple table alias after FROM is supported. But JOIN and subqueries may not be. Refer to the Dataverse documentation for the current feature set.
results = client.query.sql(
"SELECT TOP 10 accountid, name FROM account WHERE statecode = 0"
)
for record in results:
print(record["name"])
client.query.fetchxml(xml) returns an inert FetchXmlQuery object — no HTTP request is made until .execute() or .execute_pages() is called.
xml = """
<fetch top="50">
<entity name="account">
<attribute name="accountid" />
<attribute name="name" />
<filter>
<condition attribute="statecode" operator="eq" value="0" />
</filter>
</entity>
</fetch>
"""
# Load all results into memory (simple, small-to-medium sets)
query = client.query.fetchxml(xml)
result = query.execute() # returns QueryResult — all pages fetched upfront
for record in result:
print(record["name"])
# Stream page-by-page (large sets or early exit)
for page in query.execute_pages(): # yields one QueryResult per HTTP page
process(page.to_dataframe())
# Create table with columns (include customization prefix!)
table_info = client.tables.create(
"new_Product",
{
"new_Code": "string",
"new_Price": "decimal",
"new_Active": "bool",
"new_Quantity": "int",
},
)
# With solution assignment and custom primary column
table_info = client.tables.create(
"new_Product",
{"new_Code": "string", "new_Price": "decimal"},
solution="MyPublisher",
primary_column="new_ProductCode",
)
Types on the same line map to the same exact format under the hood
"string" or "text" - Single line of text"memo" or "multiline" - Multiple lines of text (4000 character default)"int" or "integer" - Whole number"decimal" or "money" - Decimal number"float" or "double" - Floating point number"bool" or "boolean" - Yes/No"datetime" or "date" - Date"file" - File column# Add columns to existing table (must include customization prefix!)
client.tables.add_columns("new_Product", {
"new_Category": "string",
"new_InStock": "bool",
})
# Remove columns
client.tables.remove_columns("new_Product", ["new_Category"])
# Get single table information
table_info = client.tables.get("new_Product")
print(f"Logical name: {table_info['table_logical_name']}")
print(f"Entity set: {table_info['entity_set_name']}")
# List all tables
tables = client.tables.list()
for table in tables:
print(table)
client.tables.delete("new_Product")
from PowerPlatform.Dataverse.models import (
CascadeConfiguration,
Label,
LocalizedLabel,
LookupAttributeMetadata,
OneToManyRelationshipMetadata,
)
from PowerPlatform.Dataverse.common.constants import CASCADE_BEHAVIOR_REMOVE_LINK
lookup = LookupAttributeMetadata(
schema_name="new_DepartmentId",
display_name=Label(
localized_labels=[LocalizedLabel(label="Department", language_code=1033)]
),
)
relationship = OneToManyRelationshipMetadata(
schema_name="new_Department_Employee",
referenced_entity="new_department",
referencing_entity="new_employee",
referenced_attribute="new_departmentid",
cascade_configuration=CascadeConfiguration(
delete=CASCADE_BEHAVIOR_REMOVE_LINK,
),
)
result = client.tables.create_one_to_many_relationship(lookup, relationship)
print(f"Created lookup field: {result.lookup_schema_name}")
from PowerPlatform.Dataverse.models import ManyToManyRelationshipMetadata
relationship = ManyToManyRelationshipMetadata(
schema_name="new_employee_project",
entity1_logical_name="new_employee",
entity2_logical_name="new_project",
)
result = client.tables.create_many_to_many_relationship(relationship)
print(f"Created: {result.relationship_schema_name}")
result = client.tables.create_lookup_field(
referencing_table="new_order",
lookup_field_name="new_AccountId",
referenced_table="account",
display_name="Account",
required=True,
)
# Get relationship metadata
rel = client.tables.get_relationship("new_Department_Employee")
if rel:
print(f"Found: {rel.relationship_schema_name}")
# Delete relationship
client.tables.delete_relationship(result.relationship_id)
# Upload file to a file column
client.files.upload(
table="account",
record_id=account_id,
file_column="new_Document", # If the file column doesn't exist, it will be created automatically
path="/path/to/document.pdf",
)
Use client.batch to send multiple operations in one HTTP request. All batch methods return None; results arrive via BatchResult after execute().
# Build a batch request
batch = client.batch.new()
batch.records.create("account", {"name": "Contoso"})
batch.records.update("account", account_id, {"telephone1": "555-0100"})
batch.records.retrieve("account", account_id, select=["name"], expand=["primarycontactid"], include_annotations="OData.Community.Display.V1.FormattedValue") # single record with expand
batch.records.list("account", filter="statecode eq 0", select=["name"], orderby=["name asc"], top=50, page_size=25, count=True) # multi-record, single page
batch.query.sql("SELECT TOP 5 name FROM account")
result = batch.execute()
for item in result.responses:
if item.is_success:
print(f"[OK] {item.status_code} entity_id={item.entity_id}")
if item.data:
# GET responses populate item.data with the parsed JSON record
print(item.data.get("name"))
else:
print(f"[ERR] {item.status_code}: {item.error_message}")
# Transactional changeset (all succeed or roll back)
with batch.changeset() as cs:
ref = cs.records.create("contact", {"firstname": "Alice"})
cs.records.update("account", account_id, {"[email protected]": ref})
# Continue on error
result = batch.execute(continue_on_error=True)
print(f"Succeeded: {len(result.succeeded)}, Failed: {len(result.failed)}")
BatchResult properties:
result.responses -- list of BatchItemResponse in submission orderresult.succeeded -- responses with 2xx status codesresult.failed -- responses with non-2xx status codesresult.has_errors -- True if any response failedresult.entity_ids -- GUIDs from OData-EntityId headers (creates and updates)Batch limitations:
batch.records.get() is deprecated; use batch.records.retrieve() for single recordsbatch.records.list() returns a single page (no pagination); use top to bound resultsflush_cache() is not supported in batchThe SDK provides structured exceptions with detailed error information:
from PowerPlatform.Dataverse.core.errors import (
DataverseError,
HttpError,
MetadataError,
SQLParseError,
ValidationError,
)
from PowerPlatform.Dataverse.client import DataverseClient
try:
client.records.retrieve("account", "invalid-id")
except HttpError as e:
print(f"HTTP {e.status_code}: {e.message}")
print(f"Error code: {e.code}")
print(f"Subcode: {e.subcode}")
if e.is_transient:
print("This error may be retryable")
except ValidationError as e:
print(f"Validation error: {e.message}")
Authentication failures:
404 Not Found:
400 Bad Request:
@odata.bind errors ("undeclared property"): the navigation property name before @odata.bind is case-sensitive and must match the entity's $metadata exactly (e.g., [email protected] for custom lookups, [email protected] for system lookups). The SDK preserves @odata.bind key casing.client.query.builder() for any non-trivial query — use the builder for filtering, sorting, expansion, or formatted values; records.list() is a convenience shortcut for simple filter+select onlytop and page_size parameters appropriately; use execute_pages() for large setse.is_transient)$metadata for navigation properties - Column names in $select/$filter/record payloads use lowercase LogicalNames. Navigation properties in $expand and @odata.bind keys are case-sensitive and must match the entity's $metadata (PascalCase for custom lookups like new_CustomerId, lowercase for system lookups like parentaccountid)PowerPlatform.Dataverse.common.constantsThe SDK ships a full async client, AsyncDataverseClient, under PowerPlatform.Dataverse.aio. Requires the [async] extra: pip install "PowerPlatform-Dataverse-Client[async]".
Note: snippets in this section are fragments. Every
awaitline assumes it lives inside anasync def main(): ...body withclientandcredentialalready constructed (see the Client Initialization block for the wrapper). Outside an async function,awaitis aSyntaxError.
from azure.identity.aio import DefaultAzureCredential
from PowerPlatform.Dataverse.aio import AsyncDataverseClient
# given: credential constructed (e.g. DefaultAzureCredential())
# Context manager (recommended -- closes session and clears caches automatically)
async with AsyncDataverseClient("https://yourorg.crm.dynamics.com", credential) as client:
... # all operations here
# Standalone (call aclose() in a finally block)
client = AsyncDataverseClient("https://yourorg.crm.dynamics.com", credential)
try:
...
finally:
await client.aclose()
Every sync method has an async equivalent -- add await:
# given: client is an open AsyncDataverseClient
# Create
account_id = await client.records.create("account", {"name": "Contoso Ltd"})
# Read
account = await client.records.retrieve("account", account_id, select=["name", "telephone1"])
# Update
await client.records.update("account", account_id, {"telephone1": "555-0200"})
# Delete
await client.records.delete("account", account_id)
# Bulk create
ids = await client.records.create("account", [{"name": "A"}, {"name": "B"}])
# given: client is an open AsyncDataverseClient
from PowerPlatform.Dataverse.models.filters import col
# Collect all results
result = await (
client.query.builder("account")
.select("name", "telephone1")
.where(col("statecode") == 0)
.top(10)
.execute()
)
for record in result:
print(record["name"])
# Lazy page iteration (memory-efficient)
async for page in (
client.query.builder("account")
.select("name")
.page_size(500)
.execute_pages()
):
for record in page:
print(record["name"])
# SQL query
rows = await client.query.sql("SELECT TOP 5 name FROM account")
# FetchXML
xml = '<fetch top="5"><entity name="account"><attribute name="name"/></entity></fetch>'
rows = await client.query.fetchxml(xml).execute()
# given: client is open; account_id from an earlier records.create
# Plain batch
batch = client.batch.new()
batch.records.create("account", {"name": "Alpha"})
result = await batch.execute()
# Atomic changeset
batch = client.batch.new()
async with batch.changeset() as cs:
ref = cs.records.create("contact", {"firstname": "Alice"})
cs.records.update("account", account_id, {"[email protected]": ref})
result = await batch.execute()
# given: client is an open AsyncDataverseClient
import pandas as pd
# Query to DataFrame
result = await (
client.query.builder("account")
.select("name", "telephone1")
.where(col("statecode") == 0)
.execute()
)
df = result.to_dataframe()
# Create from DataFrame
new_accounts = pd.DataFrame([{"name": "Contoso"}, {"name": "Fabrikam"}])
ids = await client.dataframe.create("account", new_accounts)
Load these resources as needed during development:
client.query.builder() for queries — it's the primary query pattern; records.list() is a shortcut for trivial filter+select onlyhttps://org.crm.dynamics.comis_transient for retry logictools
Development guidance for contributing to the PowerPlatform Dataverse Client Python SDK repository. Use when working on SDK development tasks like adding features, fixing bugs, or writing tests.
tools
Use when work should span one or more detached tasks but still behave like one job with a single owner context. TaskFlow is the durable flow substrate under authoring layers like Lobster, ACPX, plugins, or plain code. Keep conditional logic in the caller; use TaskFlow for flow identity, child-task linkage, waiting state, revision-checked mutations, and user-facing emergence.
tools
# Lobster Lobster executes multi-step workflows with approval checkpoints. Use it when: - User wants a repeatable automation (triage, monitor, sync) - Actions need human approval before executing (send, post, delete) - Multiple tool calls should run as one deterministic operation ## When to use Lobster | User intent | Use Lobster? | | ------------------------------------------------------ | --------------------------
tools
# Lobster Lobster executes multi-step workflows with approval checkpoints. Use it when: - User wants a repeatable automation (triage, monitor, sync) - Actions need human approval before executing (send, post, delete) - Multiple tool calls should run as one deterministic operation ## When to use Lobster | User intent | Use Lobster? | | ------------------------------------------------------ | --------------------------