skills/building-adversary-infrastructure-tracking-system/SKILL.md
构建自动化系统,利用被动 DNS、证书透明度、WHOIS 数据和 IP 富化来映射和监控威胁行为者的命令与控制(C2)网络,追踪对手基础设施。
npx skillsauth add killvxk/cybersecurity-skills-zh building-adversary-infrastructure-tracking-systemInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
对手基础设施追踪利用被动 DNS 记录、证书透明度日志、WHOIS 注册数据和 IP 富化来发现、映射和监控威胁行为者的命令与控制(C2)网络。攻击者在不同攻击活动中频繁复用托管服务商、注册商、SSL 证书和命名模式,使分析人员能够从已知指标枢纽到发现新的基础设施。本技能涵盖构建识别基础设施关系的自动化追踪系统、检测与对手模式匹配的新注册域名,以及维护持续更新的威胁行为者网络地图。
requests、dnspython、python-whois、shodan、networkx 库被动 DNS 捕获历史 DNS 解析数据,记录哪些域名解析到了哪些 IP 以及发生时间。与主动 DNS 查询不同,即使记录已更改,被动 DNS 也会保留历史关系,使分析人员能够追踪基础设施变化、识别共享托管模式,并发现历史上解析到相同 IP 的相关域名。
枢纽通过跟踪连接来识别相关基础设施:IP 枢纽(查找某 IP 上的所有域名)、域名枢纽(查找某域名解析过的所有 IP)、WHOIS 枢纽(查找同一注册人的域名)、证书枢纽(查找共享 SSL 证书的主机)以及 NS/MX 枢纽(查找使用相同域名服务器或邮件服务器的域名)。
威胁行为者表现出一定模式:偏好的注册商(Namecheap、REG.RU、Tucows)、偏好的托管(防弹托管服务商、云服务)、域名生成算法(DGA)、一致的命名模式,以及跨攻击活动复用证书。
import requests
import json
from collections import defaultdict
from datetime import datetime
class InfrastructureTracker:
def __init__(self, securitytrails_key=None, vt_key=None, shodan_key=None):
self.st_key = securitytrails_key
self.vt_key = vt_key
self.shodan_key = shodan_key
self.infrastructure_graph = defaultdict(lambda: {"nodes": set(), "edges": []})
def passive_dns_lookup(self, domain):
"""查询域名的被动 DNS 历史解析记录。"""
headers = {"apikey": self.st_key}
url = f"https://api.securitytrails.com/v1/history/{domain}/dns/a"
resp = requests.get(url, headers=headers, timeout=30)
if resp.status_code == 200:
records = resp.json().get("records", [])
history = []
for record in records:
for value in record.get("values", []):
history.append({
"domain": domain,
"ip": value.get("ip", ""),
"first_seen": record.get("first_seen", ""),
"last_seen": record.get("last_seen", ""),
"type": record.get("type", "a"),
})
print(f"[+] 被动 DNS for {domain}: {len(history)} 条记录")
return history
return []
def reverse_ip_lookup(self, ip_address):
"""查找托管在某 IP 上的所有域名。"""
headers = {"apikey": self.st_key}
url = f"https://api.securitytrails.com/v1/ips/nearby/{ip_address}"
resp = requests.get(url, headers=headers, timeout=30)
if resp.status_code == 200:
blocks = resp.json().get("blocks", [])
domains = []
for block in blocks:
for site in block.get("sites", []):
domains.append(site)
print(f"[+] 反向 IP 查询 {ip_address}: {len(domains)} 个域名")
return domains
return []
def whois_lookup(self, domain):
"""获取用于枢纽的 WHOIS 注册数据。"""
headers = {"apikey": self.st_key}
url = f"https://api.securitytrails.com/v1/domain/{domain}/whois"
resp = requests.get(url, headers=headers, timeout=30)
if resp.status_code == 200:
data = resp.json()
whois_data = {
"domain": domain,
"registrar": data.get("registrar", ""),
"registrant_org": data.get("registrant_org", ""),
"registrant_email": data.get("registrant_email", ""),
"name_servers": data.get("nameServers", []),
"created_date": data.get("createdDate", ""),
"updated_date": data.get("updatedDate", ""),
"expires_date": data.get("expiresDate", ""),
}
return whois_data
return {}
def pivot_from_seed(self, seed_indicator, indicator_type="domain", depth=2):
"""从种子指标递归枢纽以发现基础设施。"""
discovered = {"domains": set(), "ips": set(), "relationships": []}
if indicator_type == "domain":
discovered["domains"].add(seed_indicator)
# 获取域名的 IP
pdns = self.passive_dns_lookup(seed_indicator)
for record in pdns:
ip = record["ip"]
discovered["ips"].add(ip)
discovered["relationships"].append({
"source": seed_indicator, "target": ip,
"type": "resolves_to",
"first_seen": record["first_seen"],
"last_seen": record["last_seen"],
})
if depth > 1:
# 对已发现的 IP 进行反向查询
reverse_domains = self.reverse_ip_lookup(ip)
for rd in reverse_domains[:20]:
discovered["domains"].add(rd)
discovered["relationships"].append({
"source": rd, "target": ip,
"type": "hosted_on",
})
elif indicator_type == "ip":
discovered["ips"].add(seed_indicator)
domains = self.reverse_ip_lookup(seed_indicator)
for domain in domains[:20]:
discovered["domains"].add(domain)
discovered["relationships"].append({
"source": domain, "target": seed_indicator,
"type": "hosted_on",
})
print(f"[+] 从 {seed_indicator} 枢纽: "
f"{len(discovered['domains'])} 个域名, "
f"{len(discovered['ips'])} 个 IP, "
f"{len(discovered['relationships'])} 个关系")
return discovered
tracker = InfrastructureTracker(
securitytrails_key="YOUR_ST_KEY",
vt_key="YOUR_VT_KEY",
)
import networkx as nx
class InfrastructureGraph:
def __init__(self):
self.graph = nx.Graph()
def add_discovery(self, discovery_data):
"""将已发现的基础设施添加到图中。"""
for domain in discovery_data["domains"]:
self.graph.add_node(domain, type="domain")
for ip in discovery_data["ips"]:
self.graph.add_node(ip, type="ip")
for rel in discovery_data["relationships"]:
self.graph.add_edge(
rel["source"], rel["target"],
relationship=rel["type"],
first_seen=rel.get("first_seen", ""),
last_seen=rel.get("last_seen", ""),
)
def find_clusters(self):
"""识别基础设施集群。"""
components = list(nx.connected_components(self.graph))
clusters = []
for component in components:
domains = [n for n in component if self.graph.nodes[n].get("type") == "domain"]
ips = [n for n in component if self.graph.nodes[n].get("type") == "ip"]
clusters.append({
"size": len(component),
"domains": sorted(domains),
"ips": sorted(ips),
"domain_count": len(domains),
"ip_count": len(ips),
})
clusters.sort(key=lambda x: x["size"], reverse=True)
print(f"[+] 基础设施集群: {len(clusters)} 个")
return clusters
def find_hub_nodes(self, top_n=10):
"""查找高中心性节点(共享基础设施)。"""
centrality = nx.degree_centrality(self.graph)
top_nodes = sorted(centrality.items(), key=lambda x: x[1], reverse=True)[:top_n]
hubs = []
for node, score in top_nodes:
hubs.append({
"node": node,
"type": self.graph.nodes[node].get("type", "unknown"),
"centrality": round(score, 4),
"connections": self.graph.degree(node),
})
return hubs
def export_graph(self, output_file="infrastructure_graph.json"):
data = nx.node_link_data(self.graph)
with open(output_file, "w") as f:
json.dump(data, f, indent=2)
print(f"[+] 图已导出: {self.graph.number_of_nodes()} 个节点, "
f"{self.graph.number_of_edges()} 条边")
infra_graph = InfrastructureGraph()
discovery = tracker.pivot_from_seed("evil-domain.com", depth=2)
infra_graph.add_discovery(discovery)
clusters = infra_graph.find_clusters()
hubs = infra_graph.find_hub_nodes()
infra_graph.export_graph()
import time
class InfrastructureMonitor:
def __init__(self, tracker, known_indicators):
self.tracker = tracker
self.known = set(known_indicators)
self.alerts = []
def check_new_registrations(self, patterns):
"""检查是否有与对手模式匹配的新注册域名。"""
import re
new_domains = []
for pattern in patterns:
# 查询 SecurityTrails 查找匹配模式的新域名
headers = {"apikey": self.tracker.st_key}
url = "https://api.securitytrails.com/v1/domains/list"
params = {"include_ips": "true", "page": 1}
body = {"filter": {"keyword": pattern}}
resp = requests.post(url, headers=headers, json=body, timeout=30)
if resp.status_code == 200:
records = resp.json().get("records", [])
for record in records:
domain = record.get("hostname", "")
if domain not in self.known:
new_domains.append({
"domain": domain,
"pattern_matched": pattern,
"first_seen": datetime.now().isoformat(),
})
self.known.add(domain)
if new_domains:
print(f"[ALERT] {len(new_domains)} 个新域名匹配模式")
self.alerts.extend(new_domains)
return new_domains
def generate_infrastructure_report(self, clusters, hubs):
report = f"""# 对手基础设施追踪报告
生成时间: {datetime.now().isoformat()}
## 摘要
- 识别基础设施集群: {len(clusters)} 个
- 追踪域名总数: {sum(c['domain_count'] for c in clusters)}
- 追踪 IP 总数: {sum(c['ip_count'] for c in clusters)}
- 检测新域名: {len(self.alerts)} 个
## 顶级基础设施枢纽节点
| 节点 | 类型 | 连接数 | 中心性 |
|------|------|-------------|------------|
"""
for hub in hubs[:10]:
report += (f"| {hub['node']} | {hub['type']} "
f"| {hub['connections']} | {hub['centrality']} |\n")
report += "\n## 基础设施集群\n"
for i, cluster in enumerate(clusters[:5], 1):
report += f"\n### 集群 {i}({cluster['size']} 个节点)\n"
report += f"- 域名: {', '.join(cluster['domains'][:5])}\n"
report += f"- IP: {', '.join(cluster['ips'][:5])}\n"
with open("infrastructure_report.md", "w") as f:
f.write(report)
print("[+] 基础设施报告已保存")
monitor = InfrastructureMonitor(tracker, known_indicators=set())
testing
设计并执行社会工程学渗透测试,包括钓鱼、语音钓鱼、短信钓鱼和物理借口活动,以衡量人员安全韧性并识别培训差距。
testing
主持结构化的事件后审查,以识别根本原因、记录有效和无效的措施,并提出可操作的改进建议以提升未来的事件响应能力。
testing
通过分析举报的邮件、提取指标、评估凭据受攻陷情况、在全组织范围隔离恶意邮件并修复受影响账号来响应网络钓鱼事件。涵盖邮件头分析、URL/附件沙箱检测和邮箱范围清除操作。适用于网络钓鱼响应、邮件事件、凭据钓鱼、鱼叉式网络钓鱼调查或钓鱼修复相关请求。
tools
票据传递(Pass-the-Ticket,PtT)是一种横向移动技术,使用窃取的 Kerberos 票据(TGT 或 TGS)在不知道用户密码的情况下向服务进行认证。通过从已控制的主机内存中提取 Kerberos 票据,攻击者可以将这些票据注入自己的会话以模拟票据所有者。