skills/building-threat-feed-aggregation-with-misp/SKILL.md
部署 MISP(恶意软件信息共享平台)来聚合、关联和分发来自多个来源的威胁情报推送,用于集中式 IOC 管理和自动化 SIEM 集成。
npx skillsauth add killvxk/cybersecurity-skills-zh building-threat-feed-aggregation-with-mispInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
MISP 是领先的开源威胁情报平台,用于收集、存储、分发和共享网络安全指标和威胁情报。它将来自 OSINT 来源、商业提供商和共享社区的推送聚合到统一平台,具备自动关联、STIX/TAXII 导出,以及与 SIEM 和安全工具的直接集成。本技能涵盖通过 Docker 部署 MISP、配置来自 abuse.ch、AlienVault OTX 和 CIRCL 等来源的推送、设置自动推送同步,以及与 Splunk、Elasticsearch 和 SOAR 平台集成。
pymisp 库用于 API 交互MISP 将威胁情报存储为包含属性(IOC)的事件,按类型和类别组织。事件可以有标签(MITRE ATT&CK、TLP 标记、行业标签)、Galaxy(威胁行为者档案、恶意软件家族、攻击模式)和对象(相关属性的结构化分组)。事件在实例间自动关联。
MISP 支持三种推送格式:MISP 格式(原生 JSON 事件)、CSV(逗号分隔的 IOC)和自由文本(非结构化文本,自动提取 IOC)。推送可以是远程的(从 URL 获取)或本地的(上传文件)。MISP 内置了 80 多个默认 OSINT 推送,包括 abuse.ch URLhaus、Botvrij、CIRCL OSINT 和恶意软件流量分析。
MISP 实例可通过推/拉机制与其他 MISP 实例同步。共享组控制分发范围(仅限本组织、本社区、已连接社区、所有社区)。TAXII 服务器模块支持与 STIX/TAXII 消费者集成。
# docker-compose.yml 用于 MISP 部署
version: '3.8'
services:
misp:
image: coolacid/misp-docker:core-latest
container_name: misp
restart: unless-stopped
ports:
- "443:443"
- "80:80"
environment:
- MYSQL_HOST=misp-db
- MYSQL_DATABASE=misp
- MYSQL_USER=misp
- MYSQL_PASSWORD=misp_db_password_change_me
- [email protected]
- MISP_ADMIN_PASSPHRASE=admin_password_change_me
- MISP_BASEURL=https://misp.organization.com
- POSTFIX_RELAY_HOST=smtp.organization.com
- TIMEZONE=UTC
volumes:
- misp-data:/var/www/MISP/app/files
- misp-config:/var/www/MISP/app/Config
depends_on:
- misp-db
- misp-redis
misp-db:
image: mysql:8.0
container_name: misp-db
restart: unless-stopped
environment:
- MYSQL_DATABASE=misp
- MYSQL_USER=misp
- MYSQL_PASSWORD=misp_db_password_change_me
- MYSQL_ROOT_PASSWORD=root_password_change_me
volumes:
- misp-db-data:/var/lib/mysql
misp-redis:
image: redis:7
container_name: misp-redis
restart: unless-stopped
volumes:
misp-data:
misp-config:
misp-db-data:
from pymisp import PyMISP, MISPFeed
import json
class MISPFeedManager:
def __init__(self, misp_url, misp_key, verify_ssl=False):
self.misp = PyMISP(misp_url, misp_key, verify_ssl)
print(f"[+] 已连接到 MISP: {misp_url}")
def list_feeds(self):
"""列出所有已配置的推送。"""
feeds = self.misp.feeds()
enabled = [f for f in feeds if f.get("Feed", {}).get("enabled")]
disabled = [f for f in feeds if not f.get("Feed", {}).get("enabled")]
print(f"[+] 推送: {len(enabled)} 个已启用, {len(disabled)} 个已禁用")
return feeds
def enable_default_feeds(self):
"""启用推荐的默认 OSINT 推送。"""
recommended_feeds = [
"CIRCL OSINT Feed",
"Botvrij.eu - Indicators of Compromise",
"abuse.ch URLhaus Host file",
"abuse.ch Feodo Tracker",
"abuse.ch SSL Blacklist",
"malwaredomainlist",
"CyberCure - IP Feed",
]
feeds = self.misp.feeds()
enabled_count = 0
for feed in feeds:
feed_data = feed.get("Feed", {})
if feed_data.get("name") in recommended_feeds:
if not feed_data.get("enabled"):
self.misp.enable_feed(feed_data["id"])
self.misp.enable_feed_cache(feed_data["id"])
enabled_count += 1
print(f" [+] 已启用: {feed_data['name']}")
print(f"[+] 共启用 {enabled_count} 个推送")
def add_custom_feed(self, name, url, provider, feed_format="csv",
input_source="network", enabled=True):
"""添加自定义威胁情报推送。"""
feed = MISPFeed()
feed.name = name
feed.provider = provider
feed.url = url
feed.source_format = feed_format
feed.input_source = input_source
feed.enabled = enabled
feed.caching_enabled = True
feed.publish = False
feed.distribution = "3" # 所有社区
result = self.misp.add_feed(feed)
if "Feed" in result:
feed_id = result["Feed"]["id"]
print(f"[+] 已添加推送: {name} (ID: {feed_id})")
return feed_id
else:
print(f"[-] 添加推送失败: {result}")
return None
def fetch_all_feeds(self):
"""触发所有已启用推送的获取。"""
feeds = self.misp.feeds()
for feed in feeds:
feed_data = feed.get("Feed", {})
if feed_data.get("enabled"):
self.misp.fetch_feed(feed_data["id"])
print(f" [*] 正在获取: {feed_data['name']}")
print("[+] 已触发所有已启用推送的获取")
manager = MISPFeedManager(
"https://misp.organization.com",
"YOUR_MISP_API_KEY",
)
manager.enable_default_feeds()
manager.add_custom_feed(
name="Abuse.ch MalwareBazaar 近期",
url="https://bazaar.abuse.ch/export/csv/recent/",
provider="abuse.ch",
feed_format="csv",
)
manager.fetch_all_feeds()
def search_indicators(misp, value=None, type_attribute=None, tags=None, last_days=30):
"""在 MISP 中搜索并关联指标。"""
from datetime import datetime, timedelta
date_from = (datetime.now() - timedelta(days=last_days)).strftime("%Y-%m-%d")
search_params = {
"date_from": date_from,
"published": True,
"enforceWarninglist": True,
}
if value:
search_params["value"] = value
if type_attribute:
search_params["type_attribute"] = type_attribute
if tags:
search_params["tags"] = tags
results = misp.search("attributes", **search_params)
attributes = results.get("Attribute", [])
print(f"[+] 搜索返回 {len(attributes)} 个属性")
# 按事件分组以获取上下文
events = {}
for attr in attributes:
event_id = attr.get("event_id", "")
if event_id not in events:
events[event_id] = {"attributes": [], "tags": set()}
events[event_id]["attributes"].append({
"type": attr.get("type", ""),
"value": attr.get("value", ""),
"category": attr.get("category", ""),
"timestamp": attr.get("timestamp", ""),
})
for tag in attr.get("Tag", []):
events[event_id]["tags"].add(tag.get("name", ""))
return {"attributes": attributes, "events": events}
# 搜索特定 IOC
misp = manager.misp
results = search_indicators(misp, value="203.0.113.1")
results_by_type = search_indicators(misp, type_attribute="ip-dst", last_days=7)
results_by_tag = search_indicators(misp, tags=["tlp:white", "type:OSINT"])
import requests
from datetime import datetime, timedelta
class MISPSIEMExporter:
def __init__(self, misp_client):
self.misp = misp_client
def export_to_splunk(self, splunk_url, hec_token, days=7):
"""通过 HEC 将近期 MISP 指标导出到 Splunk。"""
date_from = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
results = self.misp.search("attributes", date_from=date_from,
published=True, enforceWarninglist=True)
attributes = results.get("Attribute", [])
headers = {"Authorization": f"Splunk {hec_token}"}
exported = 0
for attr in attributes:
event = {
"event": {
"ioc_type": attr.get("type", ""),
"ioc_value": attr.get("value", ""),
"category": attr.get("category", ""),
"event_id": attr.get("event_id", ""),
"timestamp": attr.get("timestamp", ""),
"tags": [t.get("name", "") for t in attr.get("Tag", [])],
},
"sourcetype": "misp:attribute",
"source": "misp",
"index": "threat_intel",
}
resp = requests.post(
f"{splunk_url}/services/collector/event",
headers=headers, json=event, verify=False,
)
if resp.status_code == 200:
exported += 1
print(f"[+] 已导出 {exported}/{len(attributes)} 个指标到 Splunk")
def export_ioc_list(self, output_file, ioc_types=None, days=30):
"""导出防火墙/代理封锁列表的平面 IOC 列表。"""
ioc_types = ioc_types or ["ip-dst", "domain", "hostname", "url"]
date_from = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
all_iocs = []
for ioc_type in ioc_types:
results = self.misp.search(
"attributes", type_attribute=ioc_type,
date_from=date_from, published=True,
enforceWarninglist=True,
)
for attr in results.get("Attribute", []):
all_iocs.append(attr.get("value", ""))
unique_iocs = sorted(set(all_iocs))
with open(output_file, "w") as f:
for ioc in unique_iocs:
f.write(f"{ioc}\n")
print(f"[+] 已导出 {len(unique_iocs)} 个唯一 IOC 到 {output_file}")
exporter = MISPSIEMExporter(misp)
exporter.export_ioc_list("blocklist_ips.txt", ioc_types=["ip-dst"], days=7)
testing
设计并执行社会工程学渗透测试,包括钓鱼、语音钓鱼、短信钓鱼和物理借口活动,以衡量人员安全韧性并识别培训差距。
testing
主持结构化的事件后审查,以识别根本原因、记录有效和无效的措施,并提出可操作的改进建议以提升未来的事件响应能力。
testing
通过分析举报的邮件、提取指标、评估凭据受攻陷情况、在全组织范围隔离恶意邮件并修复受影响账号来响应网络钓鱼事件。涵盖邮件头分析、URL/附件沙箱检测和邮箱范围清除操作。适用于网络钓鱼响应、邮件事件、凭据钓鱼、鱼叉式网络钓鱼调查或钓鱼修复相关请求。
tools
票据传递(Pass-the-Ticket,PtT)是一种横向移动技术,使用窃取的 Kerberos 票据(TGT 或 TGS)在不知道用户密码的情况下向服务进行认证。通过从已控制的主机内存中提取 Kerberos 票据,攻击者可以将这些票据注入自己的会话以模拟票据所有者。