skills/building-threat-intelligence-feed-integration/SKILL.md
构建自动化威胁情报(Threat Intelligence)源集成管道,将 STIX/TAXII 源、 开源威胁情报和商业 TI 平台接入 SIEM 和安全工具,实现实时 IOC 匹配和告警。 适用于 SOC 团队需要通过自动化源接入、标准化、评分和分发到检测系统来 将威胁情报付诸实践的场景。
npx skillsauth add killvxk/cybersecurity-skills-zh building-threat-intelligence-feed-integrationInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
以下情况使用本技能:
不适用于手动 IOC 查询——对于临时查询,请使用专用富化工具(VirusTotal、AbuseIPDB)。
taxii2-client、stix2 Python 包)按类型、格式和更新频率映射可用源:
| 情报源 | 格式 | IOC 类型 | 更新频率 | 费用 | |-------------|--------|-----------|-------------|------| | AlienVault OTX | STIX/JSON | IP、域名、哈希、URL | 实时 | 免费 | | Abuse.ch URLhaus | CSV/JSON | URL、域名 | 每 5 分钟 | 免费 | | Abuse.ch MalwareBazaar | JSON API | 文件哈希 | 实时 | 免费 | | CISA AIS | STIX/TAXII 2.1 | 全类型 | 每日 | 免费(美国政府) | | CrowdStrike Intel | STIX/JSON | 全类型 + 威胁行为者 TTP | 实时 | 商业 | | Mandiant Advantage | STIX 2.1 | 全类型 + 报告 | 实时 | 商业 |
连接到 TAXII 2.1 服务器并下载指标:
from taxii2client.v21 import Server, Collection
from stix2 import parse
# 连接到 TAXII 服务器(示例:CISA AIS)
server = Server(
"https://taxii.cisa.gov/taxii2/",
user="your_username",
password="your_password"
)
# 列出可用集合
for api_root in server.api_roots:
print(f"API Root: {api_root.title}")
for collection in api_root.collections:
print(f" Collection: {collection.title} (ID: {collection.id})")
# 从集合获取指标
collection = Collection(
"https://taxii.cisa.gov/taxii2/collections/COLLECTION_ID/",
user="your_username",
password="your_password"
)
# 获取过去 24 小时添加的指标
from datetime import datetime, timedelta
added_after = (datetime.utcnow() - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%S.000Z")
response = collection.get_objects(added_after=added_after, type=["indicator"])
for obj in response.get("objects", []):
indicator = parse(obj)
print(f"Type: {indicator.type}")
print(f"Pattern: {indicator.pattern}")
print(f"Valid Until: {indicator.valid_until}")
print(f"Confidence: {indicator.confidence}")
print("---")
Abuse.ch URLhaus 源:
import requests
import csv
from io import StringIO
# 下载 URLhaus 近期 URL
response = requests.get("https://urlhaus.abuse.ch/downloads/csv_recent/")
reader = csv.reader(StringIO(response.text), delimiter=',')
indicators = []
for row in reader:
if row[0].startswith("#"):
continue
indicators.append({
"id": row[0],
"dateadded": row[1],
"url": row[2],
"url_status": row[3],
"threat": row[5],
"tags": row[6]
})
print(f"从 URLhaus 接入了 {len(indicators)} 条 URL")
# 仅过滤活跃威胁
active = [i for i in indicators if i["url_status"] == "online"]
print(f"活跃威胁:{len(active)} 条")
AlienVault OTX Pulse 源:
from OTXv2 import OTXv2, IndicatorTypes
otx = OTXv2("YOUR_OTX_API_KEY")
# 获取订阅的 pulse(过去 24 小时)
pulses = otx.getall(modified_since="2024-03-14T00:00:00")
for pulse in pulses:
print(f"Pulse: {pulse['name']}")
print(f"Tags: {pulse['tags']}")
for indicator in pulse["indicators"]:
print(f" IOC: {indicator['indicator']} ({indicator['type']})")
Abuse.ch Feodo Tracker(C2 IP):
response = requests.get("https://feodotracker.abuse.ch/downloads/ipblocklist_recommended.json")
c2_data = response.json()
for entry in c2_data:
print(f"IP: {entry['ip_address']}:{entry['port']}")
print(f"Malware: {entry['malware']}")
print(f"First Seen: {entry['first_seen']}")
print(f"Last Online: {entry['last_online']}")
将所有源转换为 STIX 2.1 格式以实现标准化:
from stix2 import Indicator, Bundle
import hashlib
def create_stix_indicator(ioc_value, ioc_type, source, confidence=50):
"""将原始 IOC 转换为 STIX 2.1 指标"""
pattern_map = {
"ipv4": f"[ipv4-addr:value = '{ioc_value}']",
"domain": f"[domain-name:value = '{ioc_value}']",
"url": f"[url:value = '{ioc_value}']",
"sha256": f"[file:hashes.'SHA-256' = '{ioc_value}']",
"md5": f"[file:hashes.MD5 = '{ioc_value}']",
}
return Indicator(
name=f"{ioc_type}: {ioc_value}",
pattern=pattern_map[ioc_type],
pattern_type="stix",
valid_from="2024-03-15T00:00:00Z",
confidence=confidence,
labels=[source],
custom_properties={"x_source_feed": source}
)
# 跨源去重
seen_iocs = set()
unique_indicators = []
for ioc in all_collected_iocs:
ioc_hash = hashlib.sha256(f"{ioc['type']}:{ioc['value']}".encode()).hexdigest()
if ioc_hash not in seen_iocs:
seen_iocs.add(ioc_hash)
unique_indicators.append(
create_stix_indicator(ioc["value"], ioc["type"], ioc["source"])
)
bundle = Bundle(objects=unique_indicators)
print(f"唯一指标数:{len(unique_indicators)}")
推送至 Splunk ES 威胁情报:
import requests
splunk_url = "https://splunk.company.com:8089"
headers = {"Authorization": f"Bearer {splunk_token}"}
for indicator in unique_indicators:
# 从 STIX 模式中提取 IOC 值
ioc_value = indicator.pattern.split("'")[1]
# 上传至 Splunk ES 威胁情报集合
data = {
"ip": ioc_value,
"description": indicator.name,
"weight": indicator.confidence // 10,
"threat_key": indicator.id,
"source_feed": indicator.get("x_source_feed", "unknown")
}
requests.post(
f"{splunk_url}/services/data/threat_intel/item/ip_intel",
headers=headers, data=data, verify=False
)
推送至 MISP 进行集中管理:
from pymisp import PyMISP, MISPEvent, MISPAttribute
misp = PyMISP("https://misp.company.com", "YOUR_MISP_API_KEY")
# 为源批次创建事件
event = MISPEvent()
event.info = f"TI Feed Import - {datetime.now().strftime('%Y-%m-%d')}"
event.threat_level_id = 2 # 中等
event.analysis = 2 # 已完成
# 将指标作为属性添加
for ioc in unique_indicators:
attr = MISPAttribute()
attr.type = "ip-dst" if "ipv4" in ioc.pattern else "domain"
attr.value = ioc.pattern.split("'")[1]
attr.to_ids = True
attr.comment = f"Source: {ioc.get('x_source_feed', 'mixed')}"
event.add_attribute(**attr)
result = misp.add_event(event)
print(f"MISP 事件已创建:{result['Event']['id']}")
跟踪源有效性指标:
index=threat_intel sourcetype="threat_intel_manager"
| stats count AS total_iocs,
dc(threat_key) AS unique_iocs,
dc(source_feed) AS feed_count
by source_feed
| join source_feed [
search index=notable source="Threat Intelligence"
| stats count AS matches by source_feed
]
| eval match_rate = round(matches / unique_iocs * 100, 2)
| sort - match_rate
| table source_feed, unique_iocs, matches, match_rate
| 术语 | 定义 | |------|-----------| | STIX 2.1 | 结构化威胁信息表达——用于共享威胁情报对象的标准化 JSON 格式 | | TAXII | 指标信息可信自动化交换——通过 REST API 共享 STIX 数据的传输协议 | | TIP | 威胁情报平台——用于聚合、评分和分发威胁情报的集中系统 | | IOC 评分(IOC Scoring) | 根据源可靠性和相互印证为指标分配可信度值的过程 | | 源去重(Feed Deduplication) | 跨多个源删除重复 IOC,同时保留多源归因信息 | | IOC 过期(IOC Expiration) | 删除过时指标的生存时间策略(IP:30 天,域名:90 天,哈希:1 年) |
威胁情报源状态 — 每日报告
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
日期: 2024-03-15
IOC 总数: 45,892 个活跃指标
源健康状态:
源名称 IOC 数 匹配数 匹配率 状态
Abuse.ch URLhaus 12,340 47 0.38% 健康
AlienVault OTX 18,567 23 0.12% 健康
Abuse.ch Feodo 1,203 12 1.00% 健康
CISA AIS 8,945 8 0.09% 健康
CrowdStrike Intel 4,837 31 0.64% 健康
今日操作:
新增 IOC: 1,247 条
过期 IOC: 892 条
删除重复: 156 条
SIEM 匹配: 121 个显著事件生成
误报: 3 个(CDN IP 已从源移除)
testing
设计并执行社会工程学渗透测试,包括钓鱼、语音钓鱼、短信钓鱼和物理借口活动,以衡量人员安全韧性并识别培训差距。
testing
主持结构化的事件后审查,以识别根本原因、记录有效和无效的措施,并提出可操作的改进建议以提升未来的事件响应能力。
testing
通过分析举报的邮件、提取指标、评估凭据受攻陷情况、在全组织范围隔离恶意邮件并修复受影响账号来响应网络钓鱼事件。涵盖邮件头分析、URL/附件沙箱检测和邮箱范围清除操作。适用于网络钓鱼响应、邮件事件、凭据钓鱼、鱼叉式网络钓鱼调查或钓鱼修复相关请求。
tools
票据传递(Pass-the-Ticket,PtT)是一种横向移动技术,使用窃取的 Kerberos 票据(TGT 或 TGS)在不知道用户密码的情况下向服务进行认证。通过从已控制的主机内存中提取 Kerberos 票据,攻击者可以将这些票据注入自己的会话以模拟票据所有者。