skills/analyzing-ransomware-leak-site-intelligence/SKILL.md
监控和分析勒索软件组织的数据泄露站点(DLS),追踪受害者发布情况,提取组织战术的威胁情报,并评估特定行业的勒索软件风险以实现主动防御。
npx skillsauth add killvxk/cybersecurity-skills-zh analyzing-ransomware-leak-site-intelligenceInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
采用双重勒索模式运营的勒索软件(Ransomware)组织在 Tor 隐藏服务上维护数据泄露站点(DLS),在那里发布受害者名称、被盗数据样本和倒计时器以施压付款。2025 年上半年,96 个独特勒索软件组织活跃,每月约发布 535 名受害者。监控这些站点提供了关于活跃威胁组织、目标行业、地理模式和新兴勒索软件家族的情报。本技能涵盖安全收集 DLS 情报、提取结构化数据、追踪组织活动趋势,以及生成行业特定风险评估。
requests、beautifulsoup4、pandas、matplotlib 库现代勒索软件组织在加密受害者数据之前还会将其外泄(Exfiltration)。泄露站点作为公开施压工具:受害者以倒计时器、部分数据样本和文件目录的形式被列出。若未支付赎金,完整数据将被公开。部分组织已转向三重勒索,追加 DDoS 威胁或直接联系受害者客户。
泄露站点提供:受害者识别(公司名称、行业、国家)、攻击时间线(列出时间、截止日期、数据发布时间)、数据量估算、组织能力评估(目标行业、攻击频率、操作节奏),以及趋势分析(新组织出现、组织品牌重塑、执法打击)。
切勿在生产环境中直接访问 DLS 站点。使用专用监控服务(Ransomwatch、DarkFeed、KELA、Flashpoint)、Tor 隔离研究虚拟机、商业威胁情报平台或社区维护的数据集。所有分析应在隔离环境中进行,并获得适当授权。
import requests
import json
import pandas as pd
from datetime import datetime, timedelta
from collections import Counter
class RansomwareIntelCollector:
"""从公开追踪来源收集勒索软件 DLS 情报。"""
RANSOMWATCH_API = "https://raw.githubusercontent.com/joshhighet/ransomwatch/main/posts.json"
RANSOMWATCH_GROUPS = "https://raw.githubusercontent.com/joshhighet/ransomwatch/main/groups.json"
def __init__(self):
self.posts = []
self.groups = []
def fetch_ransomwatch_data(self):
"""从 ransomwatch 获取勒索软件受害者发布数据。"""
resp = requests.get(self.RANSOMWATCH_API, timeout=30)
if resp.status_code == 200:
self.posts = resp.json()
print(f"[+] 已从 ransomwatch 加载 {len(self.posts)} 条受害者记录")
else:
print(f"[-] 获取记录失败: {resp.status_code}")
resp = requests.get(self.RANSOMWATCH_GROUPS, timeout=30)
if resp.status_code == 200:
self.groups = resp.json()
print(f"[+] 已加载 {len(self.groups)} 个勒索软件组织画像")
return self.posts
def get_recent_victims(self, days=30):
"""获取最近 N 天内发布的受害者。"""
cutoff = datetime.now() - timedelta(days=days)
recent = []
for post in self.posts:
try:
discovered = datetime.fromisoformat(
post.get("discovered", "").replace("Z", "+00:00")
)
if discovered.replace(tzinfo=None) >= cutoff:
recent.append(post)
except (ValueError, TypeError):
continue
print(f"[+] 最近 {days} 天内 {len(recent)} 名受害者")
return recent
def get_group_activity(self, group_name):
"""获取特定勒索软件组织的所有发布记录。"""
group_posts = [
p for p in self.posts
if p.get("group_name", "").lower() == group_name.lower()
]
print(f"[+] {group_name}: 共 {len(group_posts)} 名受害者")
return group_posts
collector = RansomwareIntelCollector()
collector.fetch_ransomwatch_data()
recent = collector.get_recent_victims(days=30)
def analyze_group_trends(posts, top_n=15):
"""分析勒索软件组织活动趋势。"""
group_counts = Counter(p.get("group_name", "unknown") for p in posts)
monthly_activity = {}
for post in posts:
try:
date = datetime.fromisoformat(
post.get("discovered", "").replace("Z", "+00:00")
)
month_key = date.strftime("%Y-%m")
group = post.get("group_name", "unknown")
if month_key not in monthly_activity:
monthly_activity[month_key] = Counter()
monthly_activity[month_key][group] += 1
except (ValueError, TypeError):
continue
analysis = {
"total_posts": len(posts),
"unique_groups": len(group_counts),
"top_groups": group_counts.most_common(top_n),
"monthly_totals": {
month: sum(counts.values())
for month, counts in sorted(monthly_activity.items())
},
"monthly_top_groups": {
month: counts.most_common(5)
for month, counts in sorted(monthly_activity.items())
},
}
print(f"\n=== 勒索软件组织活动 ===")
print(f"追踪受害者总数: {analysis['total_posts']}")
print(f"活跃组织数量: {analysis['unique_groups']}")
print(f"\n前 {top_n} 活跃组织:")
for group, count in analysis["top_groups"]:
print(f" {group}: {count} 名受害者")
return analysis
trends = analyze_group_trends(collector.posts)
def assess_sector_risk(posts, target_sector=None, target_country=None):
"""评估特定行业或地区的勒索软件风险。"""
sector_data = {}
country_data = {}
for post in posts:
# 提取行业(并非所有情报源都包含此字段)
sector = post.get("sector", post.get("industry", "unknown"))
country = post.get("country", "unknown")
if sector not in sector_data:
sector_data[sector] = {"count": 0, "groups": Counter(), "recent": []}
sector_data[sector]["count"] += 1
sector_data[sector]["groups"][post.get("group_name", "")] += 1
if country not in country_data:
country_data[country] = {"count": 0, "groups": Counter()}
country_data[country]["count"] += 1
country_data[country]["groups"][post.get("group_name", "")] += 1
# 行业风险评分
total = len(posts)
risk_assessment = {
"total_victims": total,
"sectors": {},
"countries": {},
}
for sector, data in sorted(sector_data.items(), key=lambda x: -x[1]["count"]):
pct = (data["count"] / total * 100) if total > 0 else 0
risk_assessment["sectors"][sector] = {
"victim_count": data["count"],
"percentage": round(pct, 1),
"top_groups": data["groups"].most_common(5),
"risk_level": (
"critical" if pct > 15
else "high" if pct > 8
else "medium" if pct > 3
else "low"
),
}
for country, data in sorted(country_data.items(), key=lambda x: -x[1]["count"]):
pct = (data["count"] / total * 100) if total > 0 else 0
risk_assessment["countries"][country] = {
"victim_count": data["count"],
"percentage": round(pct, 1),
"top_groups": data["groups"].most_common(5),
}
return risk_assessment
risk = assess_sector_risk(collector.posts)
def track_new_groups(posts, lookback_days=90):
"""识别新出现的勒索软件组织。"""
group_first_seen = {}
for post in posts:
group = post.get("group_name", "")
try:
date = datetime.fromisoformat(
post.get("discovered", "").replace("Z", "+00:00")
)
if group not in group_first_seen or date < group_first_seen[group]["first_seen"]:
group_first_seen[group] = {
"first_seen": date,
"first_victim": post.get("post_title", ""),
}
except (ValueError, TypeError):
continue
cutoff = datetime.now() - timedelta(days=lookback_days)
new_groups = {
group: info for group, info in group_first_seen.items()
if info["first_seen"].replace(tzinfo=None) >= cutoff
}
# 统计每个新组织的受害者总数
for group in new_groups:
victims = [p for p in posts if p.get("group_name") == group]
new_groups[group]["total_victims"] = len(victims)
new_groups[group]["avg_per_month"] = round(
len(victims) / max(1, lookback_days / 30), 1
)
print(f"\n=== 新组织(最近 {lookback_days} 天)===")
for group, info in sorted(new_groups.items(), key=lambda x: -x[1]["total_victims"]):
print(f" {group}: {info['total_victims']} 名受害者, "
f"首次发现 {info['first_seen'].strftime('%Y-%m-%d')}")
return new_groups
new_groups = track_new_groups(collector.posts, lookback_days=90)
def generate_ransomware_intel_report(trends, risk, new_groups):
"""生成勒索软件威胁情报报告。"""
report = f"""# 勒索软件威胁情报报告
生成时间: {datetime.now().isoformat()}
## 执行摘要
- **追踪受害者总数**: {trends['total_posts']}
- **活跃勒索软件组织**: {trends['unique_groups']}
- **新兴组织(最近 90 天)**: {len(new_groups)}
## 最活跃组织
| 排名 | 组织 | 受害者数 |
|------|-------|---------|
"""
for i, (group, count) in enumerate(trends["top_groups"][:10], 1):
report += f"| {i} | {group} | {count} |\n"
report += "\n## 新兴组织\n"
for group, info in sorted(new_groups.items(), key=lambda x: -x[1]["total_victims"])[:10]:
report += f"- **{group}**: {info['total_victims']} 名受害者,首次出现于 {info['first_seen'].strftime('%Y-%m-%d')}\n"
report += "\n## 行业风险评估\n"
report += "| 行业 | 受害者数 | 占比 | 风险级别 |\n|--------|---------|---|------------|\n"
for sector, data in list(risk["sectors"].items())[:10]:
report += f"| {sector} | {data['victim_count']} | {data['percentage']}% | {data['risk_level'].upper()} |\n"
report += """
## 建议措施
1. 每日监控 DLS 情报,关注您的组织及供应链合作伙伴
2. 优先修补被最活跃组织利用的漏洞
3. 实施离线备份策略以降低勒索杠杆
4. 针对勒索软件场景开展桌面演练
5. 与行业 ISAC 和威胁共享社区共享指标
"""
with open("ransomware_intel_report.md", "w") as f:
f.write(report)
print("[+] 报告已保存: ransomware_intel_report.md")
return report
generate_ransomware_intel_report(trends, risk, new_groups)
testing
设计并执行社会工程学渗透测试,包括钓鱼、语音钓鱼、短信钓鱼和物理借口活动,以衡量人员安全韧性并识别培训差距。
testing
主持结构化的事件后审查,以识别根本原因、记录有效和无效的措施,并提出可操作的改进建议以提升未来的事件响应能力。
testing
通过分析举报的邮件、提取指标、评估凭据受攻陷情况、在全组织范围隔离恶意邮件并修复受影响账号来响应网络钓鱼事件。涵盖邮件头分析、URL/附件沙箱检测和邮箱范围清除操作。适用于网络钓鱼响应、邮件事件、凭据钓鱼、鱼叉式网络钓鱼调查或钓鱼修复相关请求。
tools
票据传递(Pass-the-Ticket,PtT)是一种横向移动技术,使用窃取的 Kerberos 票据(TGT 或 TGS)在不知道用户密码的情况下向服务进行认证。通过从已控制的主机内存中提取 Kerberos 票据,攻击者可以将这些票据注入自己的会话以模拟票据所有者。