.agents/skills/dagster-per-asset-healthchecks/SKILL.md
Split a monolithic Dagster job into per-asset schedules with individual healthchecks.io pings. Use when adding monitoring to Dagster assets, splitting a catch-all job, or wiring per-job healthchecks into the Nix bugster module.
npx skillsauth add edmundmiller/dotfiles dagster-per-asset-healthchecksInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Pattern for giving each Dagster asset its own schedule + healthchecks.io check.
One sync_all_hourly means a failure anywhere kills all monitoring signal.
Per-asset: each job has its own check, own schedule, own ping URL.
curl -s -X POST "https://healthchecks.io/api/v3/checks/" \
-H "X-Api-Key: <key>" \
-H "Content-Type: application/json" \
-d '{
"name": "bugster: <asset_name>",
"tags": "bugster nuc",
"grace": 3600,
"schedule": "0 * * * *",
"tz": "America/Chicago"
}' | jq '{name, ping_url, uuid}'
Delete an old combined check:
curl -s -X DELETE "https://healthchecks.io/api/v3/checks/<uuid>" \
-H "X-Api-Key: <key>"
API key in 1Password: hcw_Xxfgcx40LFjb2JJlDJvDainCDIXg
Build one ScheduleDefinition per loaded asset:
_schedules: list[dg.ScheduleDefinition] = []
_SCHEDULE_PING_URLS: dict[str, str] = {}
for _asset in _assets:
_asset_key = _asset.key.path[-1] # e.g. "github_personal_tasknotes"
_schedule_name = f"sync_{_asset_key}_hourly"
_schedules.append(
dg.ScheduleDefinition(
name=_schedule_name,
cron_schedule="0 * * * *",
target=dg.AssetSelection.assets(_asset_key),
default_status=dg.DefaultScheduleStatus.RUNNING, # ← critical
)
)
# Wire healthcheck URL if env var is set
_url = os.environ.get(f"HEALTHCHECK_PING_URL_{_asset_key.upper()}")
if _url:
_SCHEDULE_PING_URLS[_schedule_name] = _url
Gotcha: Schedules default to STOPPED. Always set default_status=DefaultScheduleStatus.RUNNING or they never fire on fresh deployments.
def _get_ping_url(context: dg.RunStatusSensorContext) -> str | None:
schedule_name = (context.dagster_run.tags or {}).get("dagster/schedule_name")
return _SCHEDULE_PING_URLS.get(schedule_name) if schedule_name else None
@dg.run_status_sensor(run_status=dg.DagsterRunStatus.STARTED, name="healthcheck_run_started")
def healthcheck_run_started(context):
if url := _get_ping_url(context):
requests.post(f"{url}/start?rid={context.dagster_run.run_id}", timeout=10)
@dg.run_status_sensor(run_status=dg.DagsterRunStatus.SUCCESS, name="healthcheck_run_success")
def healthcheck_run_success(context):
if url := _get_ping_url(context):
requests.post(f"{url}?rid={context.dagster_run.run_id}", timeout=10)
@dg.run_status_sensor(run_status=dg.DagsterRunStatus.FAILURE, name="healthcheck_run_failure")
def healthcheck_run_failure(context):
if url := _get_ping_url(context):
requests.post(f"{url}/fail?rid={context.dagster_run.run_id}", timeout=10)
_sensors = [healthcheck_run_started, healthcheck_run_success, healthcheck_run_failure] \
if _SCHEDULE_PING_URLS else []
In modules/services/dagster/bugster.nix:
healthcheckPingUrls = mkOpt (types.attrsOf types.str) { };
Generates env vars:
// lib.mapAttrs' (
name: url: lib.nameValuePair "HEALTHCHECK_PING_URL_${lib.strings.toUpper name}" url
) cfg.healthcheckPingUrls
Note: use lib.strings.toUpper — builtins.toUpper doesn't exist in NixOS Nix.
In hosts/nuc/default.nix:
bugster.healthcheckPingUrls = {
github_personal_tasknotes = "https://hc-ping.com/...";
linear_personal_tasknotes = "https://hc-ping.com/...";
travel_time_blocks = "https://hc-ping.com/...";
};
# Check schedule/sensor status
ssh nuc "curl -s http://127.0.0.1:3001/graphql -X POST -H 'Content-Type: application/json' \
-d '{\"query\": \"{ workspaceOrError { ... on Workspace { locationEntries { locationOrLoadError { ... on RepositoryLocation { repositories { schedules { name scheduleState { status } } sensors { name sensorState { status } } } } } } } } }\"}'" \
| python3 -m json.tool"
# Start a stopped schedule
SCHEDULE=sync_github_personal_tasknotes_hourly
ssh nuc "curl -s http://127.0.0.1:3001/graphql -X POST -H 'Content-Type: application/json' \
-d '{\"query\": \"mutation { startSchedule(scheduleSelector: { repositoryLocationName: \\\"grpc:localhost:4000\\\", repositoryName: \\\"__repository__\\\", scheduleName: \\\"$SCHEDULE\\\" }) { ... on ScheduleStateResult { scheduleState { status } } } }\"}'"
The bugster-setup service runs git reset --hard origin/main, so local edits get wiped.
Always push changes to GitHub via root + emiller's SSH key:
cat > /tmp/push.sh << 'SCRIPT'
#!/bin/bash
set -euo pipefail
cd /var/lib/dagster/bugster
git add -A && git commit -m "your message"
GIT_SSH_COMMAND="ssh -i /home/emiller/.ssh/id_ed25519 -o StrictHostKeyChecking=accept-new" \
git push origin main
SCRIPT
scp /tmp/push.sh nuc:/tmp/push.sh && ssh nuc "sudo bash /tmp/push.sh"
development
Read-only Linear issue access via the Linear GraphQL API.
data-ai
## <!-- Purpose: Teach agents fast day-to-day memory browse/search/read/sync workflows in pi-context-repo. --> name: searching-memory description: > Search, browse, and inspect memory quickly in pi-context-repo. Use when asked to find prior notes, inspect memory files, locate preferences, or sync recent memory updates. Trigger phrases: "search memory", "list memory files", "find in memory", "read memory file", "memory status", "sync memory". --- # Searching Memory Use this workflow for fast
development
Comprehensive guide for initializing or reorganizing agent memory into a deeply hierarchical file structure. Use when running /init, when user asks to set up memory, or when memory needs a major reorganization. Trigger phrases: "initialize memory", "set up memory", "populate memory", "build my memory", "memory init".
data-ai
Decomposes and reorganizes agent memory files into focused, single-purpose components. Use when memory has large multi-topic blocks, redundancy, or poor organization. Trigger phrases: "defrag memory", "reorganize memory", "clean up memory files", "split memory blocks".