ies/music-topos/.agents/skills/pulse-mcp-stream/SKILL.md
Layer 1 Real-Time Social Stream Monitoring via MCP with DuckDB persistence
npx skillsauth add plurigrid/asi pulse-mcp-streamInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Layer 1: Real-Time Social Stream Monitoring via MCP
Version: 1.1.0 (music-topos enhanced) Trit: +1 (Generator - produces live data) Bundle: acquisition
Pulse-MCP-stream provides real-time monitoring of social interactions, enabling the cognitive surrogate system to stay updated with the latest patterns. It streams mentions, engagement changes, and trending topics.
// pulse-mcp-server/src/index.ts
import { Server } from "@modelcontextprotocol/sdk/server";
import { Firehose } from "@atproto/sync";
import * as duckdb from "duckdb";
const server = new Server({
name: "pulse-mcp-stream",
version: "1.0.0"
});
// Connect to DuckDB for persistence
const db = new duckdb.Database("pulse_stream.duckdb");
server.setRequestHandler("subscribe", async (params) => {
const { actor, filters } = params;
const firehose = new Firehose({
service: "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
});
firehose.on("create", async (event) => {
if (event.author === actor) {
// Store in DuckDB
await db.run(`
INSERT INTO pulse_events (event_id, event_type, actor_did, text, created_at)
VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)
`, [event.uri, event.type, event.author, event.record?.text]);
}
});
await firehose.start();
return { status: "subscribed", actor };
});
CREATE TABLE pulse_events (
event_id VARCHAR PRIMARY KEY,
event_type VARCHAR, -- 'post', 'reply', 'like', 'repost', 'mention'
actor_did VARCHAR,
actor_handle VARCHAR,
subject_uri VARCHAR,
text TEXT,
created_at TIMESTAMP,
ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
gay_color VARCHAR -- Deterministic color via SPI seed
);
CREATE TABLE engagement_deltas (
delta_id VARCHAR PRIMARY KEY,
post_uri VARCHAR,
likes_delta INT,
reposts_delta INT,
replies_delta INT,
velocity FLOAT, -- engagements per minute
measured_at TIMESTAMP
);
-- Real-time velocity tracking
CREATE VIEW v_post_velocity AS
SELECT
post_uri,
COUNT(*) FILTER (WHERE event_type = 'like') as likes,
COUNT(*) FILTER (WHERE event_type = 'repost') as reposts,
COUNT(*) / (EXTRACT(EPOCH FROM (MAX(created_at) - MIN(created_at))) / 60.0) as velocity_per_min
FROM pulse_events
WHERE created_at > NOW() - INTERVAL '1 hour'
GROUP BY post_uri;
# pulse_client.py
import asyncio
import duckdb
from dataclasses import dataclass
from typing import AsyncIterator
@dataclass
class PulseEvent:
event_id: str
event_type: str
actor: str
text: str
created_at: str
class PulseClient:
def __init__(self, db_path: str = "pulse_stream.duckdb", seed: int = 0xf061ebbc2ca74d78):
self.db = duckdb.connect(db_path)
self.seed = seed
async def subscribe_actor(self, actor: str) -> AsyncIterator[PulseEvent]:
"""Subscribe to real-time updates for a user."""
# Poll DuckDB for new events
last_id = ""
while True:
result = self.db.execute("""
SELECT * FROM pulse_events
WHERE actor_handle = ? AND event_id > ?
ORDER BY created_at
LIMIT 10
""", [actor, last_id]).fetchall()
for row in result:
last_id = row[0]
yield PulseEvent(*row[:5])
await asyncio.sleep(1)
async def detect_trends(self, center_user: str, window_minutes: int = 60):
"""Detect trending topics in user's network."""
return self.db.execute("""
WITH word_counts AS (
SELECT
UNNEST(STRING_SPLIT(LOWER(text), ' ')) as word,
COUNT(*) as mentions
FROM pulse_events
WHERE created_at > NOW() - INTERVAL ? MINUTE
GROUP BY word
)
SELECT word, mentions
FROM word_counts
WHERE LENGTH(word) > 3
ORDER BY mentions DESC
LIMIT 10
""", [window_minutes]).fetchall()
# lib/pulse_stream.rb
require 'duckdb'
module PulseStream
def self.connect(db_path: "pulse_stream.duckdb")
@db = DuckDB::Database.open(db_path)
@conn = @db.connect
end
def self.latest_events(actor:, limit: 10)
@conn.query(<<~SQL, actor, limit)
SELECT event_id, event_type, text, created_at
FROM pulse_events
WHERE actor_handle = ?
ORDER BY created_at DESC
LIMIT ?
SQL
end
def self.velocity(post_uri:)
result = @conn.query(<<~SQL, post_uri)
SELECT velocity_per_min FROM v_post_velocity WHERE post_uri = ?
SQL
result.first&.first || 0.0
end
def self.viral?(post_uri:, threshold: 5.0)
velocity(post_uri: post_uri) > threshold
end
end
| Trit | Skill | Role | |------|-------|------| | -1 | influence-propagation | Validates network patterns | | 0 | bisimulation-game | Coordinates equivalence | | +1 | pulse-mcp-stream | Generates live data |
Conservation: (-1) + (0) + (+1) = 0 ✓
{
"mcpServers": {
"pulse": {
"command": "node",
"args": ["pulse-mcp-server/dist/index.js"],
"env": {
"DUCKDB_PATH": "pulse_stream.duckdb",
"GAY_SEED": "0xf061ebbc2ca74d78"
}
}
}
}
# Start pulse stream
pulse-start actor="barton.bsky.social":
python3 -c "import asyncio; from pulse_client import PulseClient; asyncio.run(PulseClient().subscribe_actor('{{actor}}'))"
# Check velocity
pulse-velocity uri:
ruby -I lib -r pulse_stream -e "PulseStream.connect; puts PulseStream.velocity(post_uri: '{{uri}}')"
# Detect trends
pulse-trends window="60":
duckdb pulse_stream.duckdb -c "SELECT * FROM v_post_velocity WHERE velocity_per_min > 1.0 LIMIT 10"
atproto-ingest (Layer 1) - Batch data collectioninfluence-propagation (Layer 7) - Network analysiscognitive-surrogate (Layer 6) - Pattern consumptionduckdb-temporal-versioning - Time-travel queriesdevelopment
BDD-Driven Mathematical Content Verification Skill Combines Behavior-Driven Development with mathematical formula extraction, verification, and transformation using: - Cucumber/Gherkin for specification - RSpec for implementation verification - mathpix-gem for LaTeX/mathematical content extraction - Pattern matching on syntax trees for formula validation Enables iterative discovery and verification of mathematical properties through executable specifications.
tools
Meta-skill that generates domain-specific AI skills from tool documentation
development
Code Query with AI-enhanced deterministic analysis via SplitMix ternary classification
development
Directed Yoneda lemma as directed path induction. Riehl-Shulman's key insight for synthetic ∞-categories.