skills/building-ioc-enrichment-pipeline-with-opencti/SKILL.md
OpenCTI is an open-source platform for managing cyber threat intelligence knowledge, built on STIX 2.1 as its native data model. This skill covers building an automated IOC enrichment pipeline using O
npx skillsauth add mukul975/anthropic-cybersecurity-skills building-ioc-enrichment-pipeline-with-openctiInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
OpenCTI is an open-source platform for managing cyber threat intelligence knowledge, built on STIX 2.1 as its native data model. This skill covers building an automated IOC enrichment pipeline using OpenCTI's connector ecosystem to enrich indicators with context from VirusTotal, Shodan, AbuseIPDB, GreyNoise, and other sources. The pipeline automatically enriches newly ingested indicators, correlates them with known threat actors and campaigns, and scores them for analyst prioritization.
pycti libraryOpenCTI uses a GraphQL API frontend backed by ElasticSearch for storage and Redis/RabbitMQ for connector communication. Data is natively stored as STIX 2.1 objects with relationships. Connectors are categorized as: External Import (feed ingestion), Internal Import (file parsing), Internal Enrichment (context addition), and Stream (real-time export).
Internal enrichment connectors are triggered automatically when new observables are created or manually by analysts. Each connector receives STIX objects, queries external services, and returns STIX 2.1 bundles that augment the original observable with additional context, labels, and relationships.
OpenCTI uses a 0-100 confidence scale for indicators. Enrichment connectors can update confidence scores based on external validation: VirusTotal detection ratios, Shodan exposure data, AbuseIPDB report counts, and GreyNoise classification results.
# docker-compose.yml (key services)
version: '3'
services:
opencti:
image: opencti/platform:6.4.4
environment:
- APP__PORT=8080
- [email protected]
- APP__ADMIN__PASSWORD=ChangeMeNow
- APP__ADMIN__TOKEN=your-admin-token-uuid
- ELASTICSEARCH__URL=http://elasticsearch:9200
- MINIO__ENDPOINT=minio
- RABBITMQ__HOSTNAME=rabbitmq
ports:
- "8080:8080"
depends_on:
- elasticsearch
- minio
- rabbitmq
- redis
connector-virustotal:
image: opencti/connector-virustotal:6.4.4
environment:
- OPENCTI_URL=http://opencti:8080
- OPENCTI_TOKEN=your-admin-token-uuid
- CONNECTOR_ID=connector-virustotal-id
- CONNECTOR_NAME=VirusTotal
- CONNECTOR_SCOPE=StixFile,Artifact,IPv4-Addr,Domain-Name,Url
- CONNECTOR_AUTO=true
- VIRUSTOTAL_TOKEN=your-vt-api-key
- VIRUSTOTAL_MAX_TLP=TLP:AMBER
connector-shodan:
image: opencti/connector-shodan:6.4.4
environment:
- OPENCTI_URL=http://opencti:8080
- OPENCTI_TOKEN=your-admin-token-uuid
- CONNECTOR_ID=connector-shodan-id
- CONNECTOR_NAME=Shodan
- CONNECTOR_SCOPE=IPv4-Addr
- CONNECTOR_AUTO=true
- SHODAN_TOKEN=your-shodan-api-key
- SHODAN_MAX_TLP=TLP:AMBER
connector-abuseipdb:
image: opencti/connector-abuseipdb:6.4.4
environment:
- OPENCTI_URL=http://opencti:8080
- OPENCTI_TOKEN=your-admin-token-uuid
- CONNECTOR_ID=connector-abuseipdb-id
- CONNECTOR_NAME=AbuseIPDB
- CONNECTOR_SCOPE=IPv4-Addr
- CONNECTOR_AUTO=true
- ABUSEIPDB_API_KEY=your-abuseipdb-key
import os
from pycti import OpenCTIConnectorHelper, get_config_variable
from stix2 import (
Bundle, Indicator, Note, Relationship,
IPv4Address, DomainName
)
import requests
class CustomEnrichmentConnector:
def __init__(self):
config = {
"opencti": {
"url": os.environ.get("OPENCTI_URL"),
"token": os.environ.get("OPENCTI_TOKEN"),
},
"connector": {
"id": os.environ.get("CONNECTOR_ID"),
"name": "CustomEnrichment",
"scope": "IPv4-Addr,Domain-Name,Url",
"auto": True,
"type": "INTERNAL_ENRICHMENT",
},
}
self.helper = OpenCTIConnectorHelper(config)
self.helper.listen(self._process_message)
def _process_message(self, data):
entity_id = data["entity_id"]
stix_object = self.helper.api.stix_cyber_observable.read(id=entity_id)
if not stix_object:
return "Observable not found"
observable_type = stix_object["entity_type"]
observable_value = stix_object.get("value", "")
enrichment_results = []
if observable_type == "IPv4-Addr":
enrichment_results = self._enrich_ip(observable_value, entity_id)
elif observable_type == "Domain-Name":
enrichment_results = self._enrich_domain(observable_value, entity_id)
if enrichment_results:
bundle = Bundle(objects=enrichment_results, allow_custom=True)
self.helper.send_stix2_bundle(bundle.serialize())
return "Enrichment completed"
def _enrich_ip(self, ip_address, entity_id):
"""Enrich IP address with GreyNoise, AbuseIPDB context."""
objects = []
# GreyNoise Community API
try:
gn_response = requests.get(
f"https://api.greynoise.io/v3/community/{ip_address}",
headers={"key": os.environ.get("GREYNOISE_API_KEY")},
timeout=30,
)
if gn_response.status_code == 200:
gn_data = gn_response.json()
classification = gn_data.get("classification", "unknown")
noise = gn_data.get("noise", False)
riot = gn_data.get("riot", False)
note_content = (
f"## GreyNoise Enrichment\n"
f"- Classification: {classification}\n"
f"- Internet Noise: {noise}\n"
f"- RIOT (Benign Service): {riot}\n"
f"- Name: {gn_data.get('name', 'N/A')}\n"
f"- Last Seen: {gn_data.get('last_seen', 'N/A')}"
)
note = Note(
content=note_content,
object_refs=[entity_id],
abstract=f"GreyNoise: {classification}",
allow_custom=True,
)
objects.append(note)
# Add labels based on classification
if classification == "malicious":
self.helper.api.stix_cyber_observable.add_label(
id=entity_id, label_name="greynoise:malicious"
)
elif riot:
self.helper.api.stix_cyber_observable.add_label(
id=entity_id, label_name="greynoise:benign-service"
)
except Exception as e:
self.helper.log_error(f"GreyNoise enrichment failed: {e}")
return objects
def _enrich_domain(self, domain, entity_id):
"""Enrich domain with WHOIS and DNS context."""
objects = []
try:
# Use SecurityTrails API for domain enrichment
st_response = requests.get(
f"https://api.securitytrails.com/v1/domain/{domain}",
headers={"APIKEY": os.environ.get("SECURITYTRAILS_API_KEY")},
timeout=30,
)
if st_response.status_code == 200:
st_data = st_response.json()
current_dns = st_data.get("current_dns", {})
a_records = [
r.get("ip") for r in current_dns.get("a", {}).get("values", [])
]
note_content = (
f"## SecurityTrails Enrichment\n"
f"- A Records: {', '.join(a_records)}\n"
f"- Alexa Rank: {st_data.get('alexa_rank', 'N/A')}\n"
f"- Hostname: {st_data.get('hostname', 'N/A')}"
)
note = Note(
content=note_content,
object_refs=[entity_id],
abstract=f"SecurityTrails: {domain}",
allow_custom=True,
)
objects.append(note)
except Exception as e:
self.helper.log_error(f"SecurityTrails enrichment failed: {e}")
return objects
if __name__ == "__main__":
connector = CustomEnrichmentConnector()
development
MISP (Malware Information Sharing Platform) is an open-source threat intelligence platform for gathering, sharing, storing, and correlating Indicators of Compromise (IOCs) of targeted attacks, threat
tools
Collects and synthesizes open-source intelligence (OSINT) about threat actors, malicious infrastructure, and attack campaigns using publicly available data sources, passive reconnaissance tools, and dark web monitoring. Use when investigating external threat actor infrastructure, performing pre-engagement reconnaissance for authorized red team assessments, or enriching CTI reports with publicly available adversary context. Activates for requests involving Maltego, Shodan, OSINT framework, SpiderFoot, or infrastructure reconnaissance.
development
Systematically collects, categorizes, and distributes indicators of compromise (IOCs) during and after security incidents to enable detection, blocking, and threat intelligence sharing. Covers network, host, email, and behavioral indicators using STIX/TAXII formats and threat intelligence platforms. Activates for requests involving IOC collection, indicator extraction, threat indicator sharing, compromise indicators, STIX export, or IOC enrichment.
development
Discovering and accessing unprotected pages, APIs, and administrative interfaces by enumerating URLs and bypassing authentication controls during authorized security assessments.