skills/dynamodb/SKILL.md
AWS DynamoDB NoSQL database for scalable data storage. Use when designing table schemas, writing queries, configuring indexes, managing capacity, implementing single-table design, or troubleshooting performance issues.
npx skillsauth add itsmostafa/aws-agent-skills dynamodbInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Amazon DynamoDB is a fully managed NoSQL database service providing fast, predictable performance at any scale. It supports key-value and document data structures.
| Key Type | Description | |----------|-------------| | Partition Key (PK) | Required. Determines data distribution | | Sort Key (SK) | Optional. Enables range queries within partition | | Composite Key | PK + SK combination |
| Index Type | Description | |------------|-------------| | GSI (Global Secondary Index) | Different PK/SK, separate throughput, eventually consistent | | LSI (Local Secondary Index) | Same PK, different SK, shares table throughput, strongly consistent option |
| Mode | Use Case | |------|----------| | On-Demand | Unpredictable traffic, pay-per-request | | Provisioned | Predictable traffic, lower cost, can use auto-scaling |
AWS CLI:
aws dynamodb create-table \
--table-name Users \
--attribute-definitions \
AttributeName=PK,AttributeType=S \
AttributeName=SK,AttributeType=S \
--key-schema \
AttributeName=PK,KeyType=HASH \
AttributeName=SK,KeyType=RANGE \
--billing-mode PAY_PER_REQUEST
boto3:
import boto3
dynamodb = boto3.resource('dynamodb')
table = dynamodb.create_table(
TableName='Users',
KeySchema=[
{'AttributeName': 'PK', 'KeyType': 'HASH'},
{'AttributeName': 'SK', 'KeyType': 'RANGE'}
],
AttributeDefinitions=[
{'AttributeName': 'PK', 'AttributeType': 'S'},
{'AttributeName': 'SK', 'AttributeType': 'S'}
],
BillingMode='PAY_PER_REQUEST'
)
table.wait_until_exists()
import boto3
from boto3.dynamodb.conditions import Key, Attr
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('Users')
# Put item
table.put_item(
Item={
'PK': 'USER#123',
'SK': 'PROFILE',
'name': 'John Doe',
'email': '[email protected]',
'created_at': '2024-01-15T10:30:00Z'
}
)
# Get item
response = table.get_item(
Key={'PK': 'USER#123', 'SK': 'PROFILE'}
)
item = response.get('Item')
# Update item
table.update_item(
Key={'PK': 'USER#123', 'SK': 'PROFILE'},
UpdateExpression='SET #name = :name, updated_at = :updated',
ExpressionAttributeNames={'#name': 'name'},
ExpressionAttributeValues={
':name': 'John Smith',
':updated': '2024-01-16T10:30:00Z'
}
)
# Delete item
table.delete_item(
Key={'PK': 'USER#123', 'SK': 'PROFILE'}
)
# Query by partition key
response = table.query(
KeyConditionExpression=Key('PK').eq('USER#123')
)
# Query with sort key condition
response = table.query(
KeyConditionExpression=Key('PK').eq('USER#123') & Key('SK').begins_with('ORDER#')
)
# Query with filter
response = table.query(
KeyConditionExpression=Key('PK').eq('USER#123'),
FilterExpression=Attr('status').eq('active')
)
# Query with projection
response = table.query(
KeyConditionExpression=Key('PK').eq('USER#123'),
ProjectionExpression='PK, SK, #name, email',
ExpressionAttributeNames={'#name': 'name'}
)
# Paginated query
paginator = dynamodb.meta.client.get_paginator('query')
for page in paginator.paginate(
TableName='Users',
KeyConditionExpression='PK = :pk',
ExpressionAttributeValues={':pk': {'S': 'USER#123'}}
):
for item in page['Items']:
print(item)
# Batch write (up to 25 items)
with table.batch_writer() as batch:
for i in range(100):
batch.put_item(Item={
'PK': f'USER#{i}',
'SK': 'PROFILE',
'name': f'User {i}'
})
# Batch get (up to 100 items)
dynamodb = boto3.resource('dynamodb')
response = dynamodb.batch_get_item(
RequestItems={
'Users': {
'Keys': [
{'PK': 'USER#1', 'SK': 'PROFILE'},
{'PK': 'USER#2', 'SK': 'PROFILE'}
]
}
}
)
aws dynamodb update-table \
--table-name Users \
--attribute-definitions AttributeName=email,AttributeType=S \
--global-secondary-index-updates '[
{
"Create": {
"IndexName": "email-index",
"KeySchema": [{"AttributeName": "email", "KeyType": "HASH"}],
"Projection": {"ProjectionType": "ALL"}
}
}
]'
from botocore.exceptions import ClientError
# Only put if item doesn't exist
try:
table.put_item(
Item={'PK': 'USER#123', 'SK': 'PROFILE', 'name': 'John'},
ConditionExpression='attribute_not_exists(PK)'
)
except ClientError as e:
if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
print("Item already exists")
# Optimistic locking with version
table.update_item(
Key={'PK': 'USER#123', 'SK': 'PROFILE'},
UpdateExpression='SET #name = :name, version = version + :inc',
ConditionExpression='version = :current_version',
ExpressionAttributeNames={'#name': 'name'},
ExpressionAttributeValues={
':name': 'New Name',
':inc': 1,
':current_version': 5
}
)
| Command | Description |
|---------|-------------|
| aws dynamodb create-table | Create table |
| aws dynamodb describe-table | Get table info |
| aws dynamodb update-table | Modify table/indexes |
| aws dynamodb delete-table | Delete table |
| aws dynamodb list-tables | List all tables |
| Command | Description |
|---------|-------------|
| aws dynamodb put-item | Create/replace item |
| aws dynamodb get-item | Read single item |
| aws dynamodb update-item | Update item attributes |
| aws dynamodb delete-item | Delete item |
| aws dynamodb query | Query by key |
| aws dynamodb scan | Full table scan |
| Command | Description |
|---------|-------------|
| aws dynamodb batch-write-item | Batch write (25 max) |
| aws dynamodb batch-get-item | Batch read (100 max) |
| aws dynamodb transact-write-items | Transaction write |
| aws dynamodb transact-get-items | Transaction read |
Symptom: ProvisionedThroughputExceededException
Causes:
Solutions:
# Use exponential backoff
import time
from botocore.config import Config
config = Config(
retries={
'max_attempts': 10,
'mode': 'adaptive'
}
)
dynamodb = boto3.resource('dynamodb', config=config)
Debug:
# Check consumed capacity by partition
aws cloudwatch get-metric-statistics \
--namespace AWS/DynamoDB \
--metric-name ConsumedReadCapacityUnits \
--dimensions Name=TableName,Value=Users \
--start-time $(date -d '1 hour ago' -u +%Y-%m-%dT%H:%M:%SZ) \
--end-time $(date -u +%Y-%m-%dT%H:%M:%SZ) \
--period 60 \
--statistics Sum
Solutions:
Debug checklist:
Issue: Scans are slow and expensive
Solutions:
# Parallel scan
import concurrent.futures
def scan_segment(segment, total_segments):
return table.scan(
Segment=segment,
TotalSegments=total_segments
)
with concurrent.futures.ThreadPoolExecutor() as executor:
results = list(executor.map(
lambda s: scan_segment(s, 4),
range(4)
))
development
AWS Step Functions workflow orchestration with state machines. Use when designing workflows, implementing error handling, configuring parallel execution, integrating with AWS services, or debugging executions.
devops
AWS SQS message queue service for decoupled architectures. Use when creating queues, configuring dead-letter queues, managing visibility timeouts, implementing FIFO ordering, or integrating with Lambda.
devops
AWS SNS notification service for pub/sub messaging. Use when creating topics, managing subscriptions, configuring message filtering, sending notifications, or setting up mobile push.
devops
AWS Secrets Manager for secure secret storage and rotation. Use when storing credentials, configuring automatic rotation, managing secret versions, retrieving secrets in applications, or integrating with RDS.