external/anthropic-cybersecurity-skills/skills/building-threat-intelligence-feed-integration/SKILL.md
Builds automated threat intelligence feed integration pipelines connecting STIX/TAXII feeds, open-source threat intel, and commercial TI platforms into SIEM and security tools for real-time IOC matching and alerting. Use when SOC teams need to operationalize threat intelligence by automating feed ingestion, normalization, scoring, and distribution to detection systems.
npx skillsauth add seikaikyo/dash-skills building-threat-intelligence-feed-integrationInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Use this skill when:
Do not use for manual IOC lookup — use dedicated enrichment tools (VirusTotal, AbuseIPDB) for ad-hoc queries.
taxii2-client, stix2 Python packages)Map available feeds by type, format, and update frequency:
| Feed Source | Format | IOC Types | Update Freq | Cost | |-------------|--------|-----------|-------------|------| | AlienVault OTX | STIX/JSON | IP, Domain, Hash, URL | Real-time | Free | | Abuse.ch URLhaus | CSV/JSON | URL, Domain | Every 5 min | Free | | Abuse.ch MalwareBazaar | JSON API | File Hash | Real-time | Free | | CISA AIS | STIX/TAXII 2.1 | All types | Daily | Free (US Gov) | | CrowdStrike Intel | STIX/JSON | All types + Actor TTP | Real-time | Commercial | | Mandiant Advantage | STIX 2.1 | All types + Reports | Real-time | Commercial |
Connect to a TAXII 2.1 server and download indicators:
from taxii2client.v21 import Server, Collection
from stix2 import parse
# Connect to TAXII server (example: CISA AIS)
server = Server(
"https://taxii.cisa.gov/taxii2/",
user="your_username",
password="your_password"
)
# List available collections
for api_root in server.api_roots:
print(f"API Root: {api_root.title}")
for collection in api_root.collections:
print(f" Collection: {collection.title} (ID: {collection.id})")
# Fetch indicators from a collection
collection = Collection(
"https://taxii.cisa.gov/taxii2/collections/COLLECTION_ID/",
user="your_username",
password="your_password"
)
# Get indicators added in last 24 hours
from datetime import datetime, timedelta
added_after = (datetime.utcnow() - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%S.000Z")
response = collection.get_objects(added_after=added_after, type=["indicator"])
for obj in response.get("objects", []):
indicator = parse(obj)
print(f"Type: {indicator.type}")
print(f"Pattern: {indicator.pattern}")
print(f"Valid Until: {indicator.valid_until}")
print(f"Confidence: {indicator.confidence}")
print("---")
Abuse.ch URLhaus Feed:
import requests
import csv
from io import StringIO
# Download URLhaus recent URLs
response = requests.get("https://urlhaus.abuse.ch/downloads/csv_recent/")
reader = csv.reader(StringIO(response.text), delimiter=',')
indicators = []
for row in reader:
if row[0].startswith("#"):
continue
indicators.append({
"id": row[0],
"dateadded": row[1],
"url": row[2],
"url_status": row[3],
"threat": row[5],
"tags": row[6]
})
print(f"Ingested {len(indicators)} URLs from URLhaus")
# Filter for active threats only
active = [i for i in indicators if i["url_status"] == "online"]
print(f"Active threats: {len(active)}")
AlienVault OTX Pulse Feed:
from OTXv2 import OTXv2, IndicatorTypes
otx = OTXv2("YOUR_OTX_API_KEY")
# Get subscribed pulses (last 24 hours)
pulses = otx.getall(modified_since="2024-03-14T00:00:00")
for pulse in pulses:
print(f"Pulse: {pulse['name']}")
print(f"Tags: {pulse['tags']}")
for indicator in pulse["indicators"]:
print(f" IOC: {indicator['indicator']} ({indicator['type']})")
Abuse.ch Feodo Tracker (C2 IPs):
response = requests.get("https://feodotracker.abuse.ch/downloads/ipblocklist_recommended.json")
c2_data = response.json()
for entry in c2_data:
print(f"IP: {entry['ip_address']}:{entry['port']}")
print(f"Malware: {entry['malware']}")
print(f"First Seen: {entry['first_seen']}")
print(f"Last Online: {entry['last_online']}")
Convert all feeds to STIX 2.1 format for standardization:
from stix2 import Indicator, Bundle
import hashlib
def create_stix_indicator(ioc_value, ioc_type, source, confidence=50):
"""Convert raw IOC to STIX 2.1 indicator"""
pattern_map = {
"ipv4": f"[ipv4-addr:value = '{ioc_value}']",
"domain": f"[domain-name:value = '{ioc_value}']",
"url": f"[url:value = '{ioc_value}']",
"sha256": f"[file:hashes.'SHA-256' = '{ioc_value}']",
"md5": f"[file:hashes.MD5 = '{ioc_value}']",
}
return Indicator(
name=f"{ioc_type}: {ioc_value}",
pattern=pattern_map[ioc_type],
pattern_type="stix",
valid_from="2024-03-15T00:00:00Z",
confidence=confidence,
labels=[source],
custom_properties={"x_source_feed": source}
)
# Deduplicate across sources
seen_iocs = set()
unique_indicators = []
for ioc in all_collected_iocs:
ioc_hash = hashlib.sha256(f"{ioc['type']}:{ioc['value']}".encode()).hexdigest()
if ioc_hash not in seen_iocs:
seen_iocs.add(ioc_hash)
unique_indicators.append(
create_stix_indicator(ioc["value"], ioc["type"], ioc["source"])
)
bundle = Bundle(objects=unique_indicators)
print(f"Unique indicators: {len(unique_indicators)}")
Push to Splunk ES Threat Intelligence:
import requests
splunk_url = "https://splunk.company.com:8089"
headers = {"Authorization": f"Bearer {splunk_token}"}
for indicator in unique_indicators:
# Extract IOC value from STIX pattern
ioc_value = indicator.pattern.split("'")[1]
# Upload to Splunk ES threat intel collection
data = {
"ip": ioc_value,
"description": indicator.name,
"weight": indicator.confidence // 10,
"threat_key": indicator.id,
"source_feed": indicator.get("x_source_feed", "unknown")
}
requests.post(
f"{splunk_url}/services/data/threat_intel/item/ip_intel",
headers=headers, data=data,
verify=not os.environ.get("SKIP_TLS_VERIFY", "").lower() == "true", # Set SKIP_TLS_VERIFY=true for self-signed certs in lab environments
)
Push to MISP for centralized management:
from pymisp import PyMISP, MISPEvent, MISPAttribute
misp = PyMISP("https://misp.company.com", "YOUR_MISP_API_KEY")
# Create event for feed batch
event = MISPEvent()
event.info = f"TI Feed Import - {datetime.now().strftime('%Y-%m-%d')}"
event.threat_level_id = 2 # Medium
event.analysis = 2 # Completed
# Add indicators as attributes
for ioc in unique_indicators:
attr = MISPAttribute()
attr.type = "ip-dst" if "ipv4" in ioc.pattern else "domain"
attr.value = ioc.pattern.split("'")[1]
attr.to_ids = True
attr.comment = f"Source: {ioc.get('x_source_feed', 'mixed')}"
event.add_attribute(**attr)
result = misp.add_event(event)
print(f"MISP Event created: {result['Event']['id']}")
Track feed effectiveness metrics:
index=threat_intel sourcetype="threat_intel_manager"
| stats count AS total_iocs,
dc(threat_key) AS unique_iocs,
dc(source_feed) AS feed_count
by source_feed
| join source_feed [
search index=notable source="Threat Intelligence"
| stats count AS matches by source_feed
]
| eval match_rate = round(matches / unique_iocs * 100, 2)
| sort - match_rate
| table source_feed, unique_iocs, matches, match_rate
| Term | Definition | |------|-----------| | STIX 2.1 | Structured Threat Information Expression — standardized JSON format for sharing threat intelligence objects | | TAXII | Trusted Automated eXchange of Indicator Information — transport protocol for sharing STIX data via REST API | | TIP | Threat Intelligence Platform — centralized system for aggregating, scoring, and distributing threat intelligence | | IOC Scoring | Process of assigning confidence values to indicators based on source reliability and corroboration | | Feed Deduplication | Removing duplicate IOCs across multiple sources while preserving multi-source attribution | | IOC Expiration | Time-to-live policy removing aged indicators (IP: 30 days, Domain: 90 days, Hash: 1 year) |
THREAT INTEL FEED STATUS — Daily Report
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Date: 2024-03-15
Total IOCs: 45,892 active indicators
Feed Health:
Feed IOCs Matches Match Rate Status
Abuse.ch URLhaus 12,340 47 0.38% HEALTHY
AlienVault OTX 18,567 23 0.12% HEALTHY
Abuse.ch Feodo 1,203 12 1.00% HEALTHY
CISA AIS 8,945 8 0.09% HEALTHY
CrowdStrike Intel 4,837 31 0.64% HEALTHY
Actions Today:
New IOCs ingested: 1,247
IOCs expired: 892
Duplicates removed: 156
SIEM matches: 121 notable events generated
False positives: 3 (CDN IPs removed from feed)
development
Automates SOC 2 Type II audit preparation including gap assessment against AICPA Trust Services Criteria (CC1-CC9), evidence collection from cloud providers and identity systems, control testing validation, remediation tracking, and continuous compliance monitoring. Covers all five TSC categories (Security, Availability, Processing Integrity, Confidentiality, Privacy) with automated evidence gathering from AWS, Azure, GCP, Okta, GitHub, and Jira. Use when preparing for or maintaining SOC 2 Type II certification.
testing
Performs tabletop exercises for SOC teams simulating security incidents through discussion-based scenarios to test incident response procedures, communication workflows, and decision-making under pressure without impacting production systems. Use when organizations need to validate IR playbooks, train analysts, or meet compliance requirements for incident response testing.
development
Perform security testing of SOAP web services by analyzing WSDL definitions and testing for XML injection, XXE, WS-Security bypass, and SOAPAction spoofing.
devops
Automate credential rotation for service accounts across Active Directory, cloud platforms, and application databases to eliminate stale secrets and reduce compromise risk.