skills/building-ioc-defanging-and-sharing-pipeline/SKILL.md
构建自动化流水线,对失陷指标(URL、IP、域名、邮件)进行去危化处理以便安全共享,并通过 TAXII 推送和威胁情报平台以 STIX 格式分发。
npx skillsauth add killvxk/cybersecurity-skills-zh building-ioc-defanging-and-sharing-pipelineInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
IOC 去危化(Defanging)将潜在恶意指标(URL、IP 地址、域名、电子邮件地址)进行修改,防止意外点击或执行,同时保持可读性以便分析和共享。本技能涵盖构建自动化流水线:从多个来源摄取原始 IOC、规范化和去重、对人工查阅进行去危化处理、转换为 STIX 2.1 格式以便机器处理,并通过 TAXII 服务器、MISP 实例和邮件报告进行分发。
defang、ioc-fanger、stix2、requests、validators 库去危化替换活跃协议和域名组件以防止执行:http:// 变为 hxxp://,https:// 变为 hxxps://,域名/IP 中的点变为 [.],邮件中的 @ 变为 [@]。这对于在报告、邮件、Slack 频道和粘贴站点中共享 IOC 至关重要——在这些场景下自动链接可能触发对恶意基础设施的网络连接。
来自不同来源的原始 IOC 格式不一致。规范化包括:转换为小写、去除尾部斜杠和空白、从 URL 中提取域名、解析 URL 编码、验证格式正确性,以及跨来源去重。
STIX 模式以标准化格式表达 IOC:[ipv4-addr:value = '203.0.113.1']、[domain-name:value = 'malicious.example.com']、[url:value = 'http://evil.com/payload']、[file:hashes.'SHA-256' = 'abc123...']。每个指标包含 valid_from、indicator_types、confidence 和可选的 TLP 标记。
import re
import hashlib
from urllib.parse import urlparse, unquote
from datetime import datetime
class IOCExtractor:
"""从文本中提取并规范化 IOC。"""
PATTERNS = {
"ipv4": r'\b(?:(?:25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)\.){3}(?:25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)\b',
"domain": r'\b(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}\b',
"url": r'https?://[^\s<>"{}|\\^`\[\]]+',
"email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
"md5": r'\b[a-fA-F0-9]{32}\b',
"sha1": r'\b[a-fA-F0-9]{40}\b',
"sha256": r'\b[a-fA-F0-9]{64}\b',
}
WHITELIST_DOMAINS = {
"google.com", "microsoft.com", "amazon.com", "github.com",
"cloudflare.com", "akamai.com", "example.com",
}
def extract_from_text(self, text):
"""从自由文本中提取所有类型的 IOC。"""
# 先对已去危化的指标进行还原
text = self._refang(text)
iocs = {"ipv4": set(), "domain": set(), "url": set(),
"email": set(), "md5": set(), "sha1": set(), "sha256": set()}
for ioc_type, pattern in self.PATTERNS.items():
matches = re.findall(pattern, text)
for match in matches:
normalized = self._normalize(match, ioc_type)
if normalized and not self._is_whitelisted(normalized, ioc_type):
iocs[ioc_type].add(normalized)
# 移除已包含在 URL 中的域名
url_domains = set()
for url in iocs["url"]:
parsed = urlparse(url)
url_domains.add(parsed.netloc)
iocs["domain"] -= url_domains
total = sum(len(v) for v in iocs.values())
print(f"[+] 从文本中提取到 {total} 个唯一 IOC")
return {k: sorted(v) for k, v in iocs.items()}
def _refang(self, text):
"""将去危化后的指标还原为活跃形式。"""
text = text.replace("hxxp://", "http://").replace("hxxps://", "https://")
text = text.replace("[.]", ".").replace("[@]", "@")
text = text.replace("[://]", "://").replace("(.)", ".")
return text
def _normalize(self, value, ioc_type):
"""规范化 IOC 值。"""
value = value.strip().lower()
if ioc_type == "url":
value = unquote(value).rstrip("/")
elif ioc_type == "domain":
value = value.rstrip(".")
return value
def _is_whitelisted(self, value, ioc_type):
"""检查 IOC 是否在白名单中。"""
if ioc_type == "domain":
return value in self.WHITELIST_DOMAINS
if ioc_type == "url":
parsed = urlparse(value)
return parsed.netloc in self.WHITELIST_DOMAINS
return False
extractor = IOCExtractor()
sample_text = """
恶意软件 C2: hxxps://evil-domain[.]com/beacon
从 192.168.1.100 下载载荷并联系 10[.]0[.]0[.]1
SHA256: 275a021bbfb6489e54d471899f7db9d1663fc695ec2fe2a2c4538aabf651fd0f
钓鱼邮件来自 attacker[@]phishing-domain[.]com
"""
iocs = extractor.extract_from_text(sample_text)
class IOCDefanger:
"""对 IOC 进行去危化处理,用于报告和通信中的安全共享。"""
def defang_url(self, url):
return url.replace("http://", "hxxp://").replace("https://", "hxxps://").replace(".", "[.]")
def defang_domain(self, domain):
return domain.replace(".", "[.]")
def defang_ip(self, ip):
return ip.replace(".", "[.]")
def defang_email(self, email):
return email.replace("@", "[@]").replace(".", "[.]")
def defang_all(self, iocs):
"""对字典中的所有 IOC 进行去危化处理。"""
defanged = {}
for ioc_type, values in iocs.items():
if ioc_type == "url":
defanged[ioc_type] = [self.defang_url(v) for v in values]
elif ioc_type == "domain":
defanged[ioc_type] = [self.defang_domain(v) for v in values]
elif ioc_type == "ipv4":
defanged[ioc_type] = [self.defang_ip(v) for v in values]
elif ioc_type == "email":
defanged[ioc_type] = [self.defang_email(v) for v in values]
else:
defanged[ioc_type] = values # 哈希值无需去危化
return defanged
def generate_sharing_report(self, iocs, defanged, report_name="IOC 报告"):
"""生成人类可读的去危化 IOC 报告。"""
report = f"# {report_name}\n"
report += f"生成时间: {datetime.now().isoformat()}\n\n"
for ioc_type in ["url", "domain", "ipv4", "email", "sha256", "sha1", "md5"]:
values = defanged.get(ioc_type, [])
if values:
report += f"## {ioc_type.upper()} ({len(values)} 个)\n"
for v in values:
report += f"- `{v}`\n"
report += "\n"
return report
defanger = IOCDefanger()
defanged = defanger.defang_all(iocs)
report = defanger.generate_sharing_report(iocs, defanged, "恶意软件攻击活动 IOC")
print(report)
from stix2 import Indicator, Bundle, TLP_WHITE, TLP_GREEN, TLP_AMBER
from datetime import datetime
class STIXConverter:
"""将原始 IOC 转换为 STIX 2.1 指标对象。"""
TLP_MAP = {"white": TLP_WHITE, "green": TLP_GREEN, "amber": TLP_AMBER}
def iocs_to_stix(self, iocs, tlp="green", confidence=75):
"""将 IOC 字典转换为 STIX 2.1 bundle。"""
stix_objects = []
marking = self.TLP_MAP.get(tlp, TLP_GREEN)
for ip in iocs.get("ipv4", []):
stix_objects.append(Indicator(
name=f"恶意 IP: {ip}",
pattern=f"[ipv4-addr:value = '{ip}']",
pattern_type="stix",
valid_from=datetime.now(),
indicator_types=["malicious-activity"],
confidence=confidence,
object_marking_refs=[marking],
))
for domain in iocs.get("domain", []):
stix_objects.append(Indicator(
name=f"恶意域名: {domain}",
pattern=f"[domain-name:value = '{domain}']",
pattern_type="stix",
valid_from=datetime.now(),
indicator_types=["malicious-activity"],
confidence=confidence,
object_marking_refs=[marking],
))
for url in iocs.get("url", []):
escaped = url.replace("'", "\\'")
stix_objects.append(Indicator(
name=f"恶意 URL: {url[:60]}",
pattern=f"[url:value = '{escaped}']",
pattern_type="stix",
valid_from=datetime.now(),
indicator_types=["malicious-activity"],
confidence=confidence,
object_marking_refs=[marking],
))
for sha256 in iocs.get("sha256", []):
stix_objects.append(Indicator(
name=f"恶意文件哈希: {sha256[:16]}...",
pattern=f"[file:hashes.'SHA-256' = '{sha256}']",
pattern_type="stix",
valid_from=datetime.now(),
indicator_types=["malicious-activity"],
confidence=confidence,
object_marking_refs=[marking],
))
bundle = Bundle(objects=stix_objects)
print(f"[+] 已创建包含 {len(stix_objects)} 个指标的 STIX bundle")
return bundle
converter = STIXConverter()
stix_bundle = converter.iocs_to_stix(iocs, tlp="amber", confidence=80)
with open("iocs_stix_bundle.json", "w") as f:
f.write(stix_bundle.serialize(pretty=True))
import requests
import json
class IOCDistributor:
"""通过各种渠道分发 IOC。"""
def push_to_misp(self, iocs, misp_url, misp_key, event_info):
"""将 IOC 作为新事件推送到 MISP。"""
headers = {
"Authorization": misp_key,
"Content-Type": "application/json",
"Accept": "application/json",
}
event = {
"Event": {
"info": event_info,
"distribution": "1", # 仅限本社区
"threat_level_id": "2", # 中等
"analysis": "2", # 已完成
"Attribute": [],
}
}
type_mapping = {
"ipv4": "ip-dst",
"domain": "domain",
"url": "url",
"email": "email-src",
"md5": "md5",
"sha1": "sha1",
"sha256": "sha256",
}
for ioc_type, values in iocs.items():
misp_type = type_mapping.get(ioc_type)
if misp_type:
for value in values:
event["Event"]["Attribute"].append({
"type": misp_type,
"value": value,
"category": "Network activity" if ioc_type in ("ipv4", "domain", "url") else "Payload delivery",
"to_ids": True,
})
resp = requests.post(
f"{misp_url}/events",
headers=headers,
json=event,
verify=False,
)
if resp.status_code == 200:
event_id = resp.json().get("Event", {}).get("id", "")
print(f"[+] MISP 事件已创建: {event_id}")
return event_id
else:
print(f"[-] MISP 错误: {resp.status_code} - {resp.text[:200]}")
return None
def push_to_taxii(self, stix_bundle, taxii_url, collection_id, username, password):
"""将 STIX bundle 推送到 TAXII 2.1 集合。"""
from taxii2client.v21 import Collection
collection = Collection(
f"{taxii_url}/collections/{collection_id}/",
user=username, password=password,
)
response = collection.add_objects(stix_bundle.serialize())
print(f"[+] TAXII: 已发布 bundle,状态: {response.status}")
return response
distributor = IOCDistributor()
distributor.push_to_misp(
iocs,
misp_url="https://misp.organization.com",
misp_key="YOUR_MISP_API_KEY",
event_info="恶意软件攻击活动 IOC - 2025",
)
testing
设计并执行社会工程学渗透测试,包括钓鱼、语音钓鱼、短信钓鱼和物理借口活动,以衡量人员安全韧性并识别培训差距。
testing
主持结构化的事件后审查,以识别根本原因、记录有效和无效的措施,并提出可操作的改进建议以提升未来的事件响应能力。
testing
通过分析举报的邮件、提取指标、评估凭据受攻陷情况、在全组织范围隔离恶意邮件并修复受影响账号来响应网络钓鱼事件。涵盖邮件头分析、URL/附件沙箱检测和邮箱范围清除操作。适用于网络钓鱼响应、邮件事件、凭据钓鱼、鱼叉式网络钓鱼调查或钓鱼修复相关请求。
tools
票据传递(Pass-the-Ticket,PtT)是一种横向移动技术,使用窃取的 Kerberos 票据(TGT 或 TGS)在不知道用户密码的情况下向服务进行认证。通过从已控制的主机内存中提取 Kerberos 票据,攻击者可以将这些票据注入自己的会话以模拟票据所有者。