skills/building-ioc-enrichment-pipeline-with-opencti/SKILL.md
OpenCTI 是一个以 STIX 2.1 为原生数据模型的开源网络威胁情报知识管理平台。本技能涵盖使用 OpenCTI 连接器生态系统构建自动化 IOC 富化流水线,通过 VirusTotal、Shodan、AbuseIPDB、GreyNoise 等来源对指标进行富化。
npx skillsauth add killvxk/cybersecurity-skills-zh building-ioc-enrichment-pipeline-with-openctiInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
OpenCTI 是一个以 STIX 2.1 为原生数据模型的开源网络威胁情报知识管理平台。本技能涵盖使用 OpenCTI 连接器生态系统构建自动化 IOC 富化流水线,通过 VirusTotal、Shodan、AbuseIPDB、GreyNoise 等来源对指标进行富化。该流水线自动对新摄入的指标进行富化,将其与已知威胁行为者和攻击活动关联,并为分析师优先排序进行评分。
pycti 库OpenCTI 使用 GraphQL API 前端,以 ElasticSearch 作为存储后端,以 Redis/RabbitMQ 用于连接器通信。数据以 STIX 2.1 对象和关系的形式原生存储。连接器分为以下类别:外部导入(推送摄取)、内部导入(文件解析)、内部富化(上下文添加)和流式处理(实时导出)。
内部富化连接器在创建新可观测对象时自动触发,或由分析师手动触发。每个连接器接收 STIX 对象、查询外部服务,并返回 STIX 2.1 bundle,以附加的上下文、标签和关系扩充原始可观测对象。
OpenCTI 对指标使用 0-100 置信度等级。富化连接器可根据外部验证更新置信度分数:VirusTotal 检测率、Shodan 暴露数据、AbuseIPDB 报告数量和 GreyNoise 分类结果。
# docker-compose.yml(核心服务)
version: '3'
services:
opencti:
image: opencti/platform:6.4.4
environment:
- APP__PORT=8080
- [email protected]
- APP__ADMIN__PASSWORD=ChangeMeNow
- APP__ADMIN__TOKEN=your-admin-token-uuid
- ELASTICSEARCH__URL=http://elasticsearch:9200
- MINIO__ENDPOINT=minio
- RABBITMQ__HOSTNAME=rabbitmq
ports:
- "8080:8080"
depends_on:
- elasticsearch
- minio
- rabbitmq
- redis
connector-virustotal:
image: opencti/connector-virustotal:6.4.4
environment:
- OPENCTI_URL=http://opencti:8080
- OPENCTI_TOKEN=your-admin-token-uuid
- CONNECTOR_ID=connector-virustotal-id
- CONNECTOR_NAME=VirusTotal
- CONNECTOR_SCOPE=StixFile,Artifact,IPv4-Addr,Domain-Name,Url
- CONNECTOR_AUTO=true
- VIRUSTOTAL_TOKEN=your-vt-api-key
- VIRUSTOTAL_MAX_TLP=TLP:AMBER
connector-shodan:
image: opencti/connector-shodan:6.4.4
environment:
- OPENCTI_URL=http://opencti:8080
- OPENCTI_TOKEN=your-admin-token-uuid
- CONNECTOR_ID=connector-shodan-id
- CONNECTOR_NAME=Shodan
- CONNECTOR_SCOPE=IPv4-Addr
- CONNECTOR_AUTO=true
- SHODAN_TOKEN=your-shodan-api-key
- SHODAN_MAX_TLP=TLP:AMBER
connector-abuseipdb:
image: opencti/connector-abuseipdb:6.4.4
environment:
- OPENCTI_URL=http://opencti:8080
- OPENCTI_TOKEN=your-admin-token-uuid
- CONNECTOR_ID=connector-abuseipdb-id
- CONNECTOR_NAME=AbuseIPDB
- CONNECTOR_SCOPE=IPv4-Addr
- CONNECTOR_AUTO=true
- ABUSEIPDB_API_KEY=your-abuseipdb-key
import os
from pycti import OpenCTIConnectorHelper, get_config_variable
from stix2 import (
Bundle, Indicator, Note, Relationship,
IPv4Address, DomainName
)
import requests
class CustomEnrichmentConnector:
def __init__(self):
config = {
"opencti": {
"url": os.environ.get("OPENCTI_URL"),
"token": os.environ.get("OPENCTI_TOKEN"),
},
"connector": {
"id": os.environ.get("CONNECTOR_ID"),
"name": "CustomEnrichment",
"scope": "IPv4-Addr,Domain-Name,Url",
"auto": True,
"type": "INTERNAL_ENRICHMENT",
},
}
self.helper = OpenCTIConnectorHelper(config)
self.helper.listen(self._process_message)
def _process_message(self, data):
entity_id = data["entity_id"]
stix_object = self.helper.api.stix_cyber_observable.read(id=entity_id)
if not stix_object:
return "未找到可观测对象"
observable_type = stix_object["entity_type"]
observable_value = stix_object.get("value", "")
enrichment_results = []
if observable_type == "IPv4-Addr":
enrichment_results = self._enrich_ip(observable_value, entity_id)
elif observable_type == "Domain-Name":
enrichment_results = self._enrich_domain(observable_value, entity_id)
if enrichment_results:
bundle = Bundle(objects=enrichment_results, allow_custom=True)
self.helper.send_stix2_bundle(bundle.serialize())
return "富化完成"
def _enrich_ip(self, ip_address, entity_id):
"""使用 GreyNoise、AbuseIPDB 上下文富化 IP 地址。"""
objects = []
# GreyNoise Community API
try:
gn_response = requests.get(
f"https://api.greynoise.io/v3/community/{ip_address}",
headers={"key": os.environ.get("GREYNOISE_API_KEY")},
timeout=30,
)
if gn_response.status_code == 200:
gn_data = gn_response.json()
classification = gn_data.get("classification", "unknown")
noise = gn_data.get("noise", False)
riot = gn_data.get("riot", False)
note_content = (
f"## GreyNoise 富化\n"
f"- 分类: {classification}\n"
f"- 互联网噪声: {noise}\n"
f"- RIOT(良性服务): {riot}\n"
f"- 名称: {gn_data.get('name', 'N/A')}\n"
f"- 最后发现: {gn_data.get('last_seen', 'N/A')}"
)
note = Note(
content=note_content,
object_refs=[entity_id],
abstract=f"GreyNoise: {classification}",
allow_custom=True,
)
objects.append(note)
# 根据分类添加标签
if classification == "malicious":
self.helper.api.stix_cyber_observable.add_label(
id=entity_id, label_name="greynoise:malicious"
)
elif riot:
self.helper.api.stix_cyber_observable.add_label(
id=entity_id, label_name="greynoise:benign-service"
)
except Exception as e:
self.helper.log_error(f"GreyNoise 富化失败: {e}")
return objects
def _enrich_domain(self, domain, entity_id):
"""使用 WHOIS 和 DNS 上下文富化域名。"""
objects = []
try:
# 使用 SecurityTrails API 进行域名富化
st_response = requests.get(
f"https://api.securitytrails.com/v1/domain/{domain}",
headers={"APIKEY": os.environ.get("SECURITYTRAILS_API_KEY")},
timeout=30,
)
if st_response.status_code == 200:
st_data = st_response.json()
current_dns = st_data.get("current_dns", {})
a_records = [
r.get("ip") for r in current_dns.get("a", {}).get("values", [])
]
note_content = (
f"## SecurityTrails 富化\n"
f"- A 记录: {', '.join(a_records)}\n"
f"- Alexa 排名: {st_data.get('alexa_rank', 'N/A')}\n"
f"- 主机名: {st_data.get('hostname', 'N/A')}"
)
note = Note(
content=note_content,
object_refs=[entity_id],
abstract=f"SecurityTrails: {domain}",
allow_custom=True,
)
objects.append(note)
except Exception as e:
self.helper.log_error(f"SecurityTrails 富化失败: {e}")
return objects
if __name__ == "__main__":
connector = CustomEnrichmentConnector()
testing
设计并执行社会工程学渗透测试,包括钓鱼、语音钓鱼、短信钓鱼和物理借口活动,以衡量人员安全韧性并识别培训差距。
testing
主持结构化的事件后审查,以识别根本原因、记录有效和无效的措施,并提出可操作的改进建议以提升未来的事件响应能力。
testing
通过分析举报的邮件、提取指标、评估凭据受攻陷情况、在全组织范围隔离恶意邮件并修复受影响账号来响应网络钓鱼事件。涵盖邮件头分析、URL/附件沙箱检测和邮箱范围清除操作。适用于网络钓鱼响应、邮件事件、凭据钓鱼、鱼叉式网络钓鱼调查或钓鱼修复相关请求。
tools
票据传递(Pass-the-Ticket,PtT)是一种横向移动技术,使用窃取的 Kerberos 票据(TGT 或 TGS)在不知道用户密码的情况下向服务进行认证。通过从已控制的主机内存中提取 Kerberos 票据,攻击者可以将这些票据注入自己的会话以模拟票据所有者。