skills/building-threat-intelligence-enrichment-in-splunk/SKILL.md
使用查询表、模块化输入和威胁情报框架,在 Splunk Enterprise Security 中构建自动化威胁情报富化流水线
npx skillsauth add killvxk/cybersecurity-skills-zh building-threat-intelligence-enrichment-in-splunkInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Splunk Enterprise Security 中的威胁情报(Threat Intelligence)框架使 SOC 团队能够自动将失陷指标(IOC)与安全事件进行关联。该框架摄取威胁情报源,将指标规范化存储到 KV Store 集合中,并通过基于查询表的关联搜索标记匹配事件。Splunk 威胁情报管理集中整合来自多个来源的收集、规范化和富化流程,为分析师提供即时上下文,从而缩短分诊时间。
外部 TI 来源(STIX/TAXII、CSV、API)
|
v
模块化输入(下载并解析情报源)
|
v
KV Store 集合(规范化 IOC 存储)
|-- ip_intel
|-- domain_intel
|-- file_intel
|-- url_intel
|-- email_intel
|
v
威胁情报查询表
|
v
关联搜索(将事件与 IOC 匹配)
|
v
Notable 事件(已富化 TI 上下文)
# inputs.conf - TAXII 情报源配置
[threatlist://taxii_feed_example]
description = TAXII 2.1 Threat Feed
type = taxii
url = https://threatfeed.example.com/taxii2/
collection = threat-indicators-v21
polling_interval = 3600
api_key = <encrypted_api_key>
disabled = false
# inputs.conf - CSV 威胁列表
[threatlist://custom_blocklist]
description = 内部威胁封锁列表
type = csv
url = https://internal.company.com/threat-feeds/blocklist.csv
polling_interval = 1800
disabled = false
# bin/threatfeed_otx.py - OTX AlienVault 情报源采集器
import json
import sys
import requests
from splunklib.modularinput import Script, Scheme, Argument, Event
class OTXFeedInput(Script):
def get_scheme(self):
scheme = Scheme("OTX AlienVault 情报源")
scheme.description = "从 AlienVault OTX 采集 IOC"
scheme.use_external_validation = False
scheme.streaming_mode = Scheme.streaming_mode_xml
api_key_arg = Argument("api_key")
api_key_arg.data_type = Argument.data_type_string
api_key_arg.required_on_create = True
scheme.add_argument(api_key_arg)
pulse_days_arg = Argument("pulse_days")
pulse_days_arg.data_type = Argument.data_type_number
pulse_days_arg.required_on_create = False
scheme.add_argument(pulse_days_arg)
return scheme
def stream_events(self, inputs, ew):
for input_name, input_item in inputs.inputs.items():
api_key = input_item["api_key"]
pulse_days = int(input_item.get("pulse_days", 30))
headers = {"X-OTX-API-KEY": api_key}
url = f"https://otx.alienvault.com/api/v1/pulses/subscribed?modified_since={pulse_days}d"
try:
response = requests.get(url, headers=headers, timeout=60)
response.raise_for_status()
data = response.json()
for pulse in data.get("results", []):
for indicator in pulse.get("indicators", []):
event = Event()
event.stanza = input_name
event.data = json.dumps({
"indicator": indicator["indicator"],
"type": indicator["type"],
"pulse_name": pulse["name"],
"pulse_id": pulse["id"],
"description": indicator.get("description", ""),
"created": indicator.get("created", ""),
"threat_source": "OTX",
"confidence": pulse.get("adversary", "unknown"),
})
ew.write_event(event)
except requests.RequestException as e:
ew.log("ERROR", f"OTX 情报源采集失败:{str(e)}")
if __name__ == "__main__":
sys.exit(OTXFeedInput().run(sys.argv))
# collections.conf
[ip_threat_intel]
field.ip = string
field.threat_type = string
field.confidence = number
field.source = string
field.description = string
field.first_seen = time
field.last_seen = time
field.severity = string
[domain_threat_intel]
field.domain = string
field.threat_type = string
field.confidence = number
field.source = string
field.whois_registrar = string
field.whois_created = string
[file_hash_intel]
field.file_hash = string
field.hash_type = string
field.malware_family = string
field.confidence = number
field.source = string
field.detection_names = string
# transforms.conf
[ip_threat_intel_lookup]
external_type = kvstore
collection = ip_threat_intel
fields_list = ip, threat_type, confidence, source, description, severity
[domain_threat_intel_lookup]
external_type = kvstore
collection = domain_threat_intel
fields_list = domain, threat_type, confidence, source
[file_hash_intel_lookup]
external_type = kvstore
collection = file_hash_intel
fields_list = file_hash, hash_type, malware_family, confidence, source
| tstats summariesonly=true count from datamodel=Network_Traffic
where All_Traffic.action=allowed
by All_Traffic.src_ip, All_Traffic.dest_ip, All_Traffic.dest_port, _time span=5m
| rename "All_Traffic.*" as *
| lookup ip_threat_intel_lookup ip as dest_ip OUTPUT threat_type, confidence, source as ti_source, severity as ti_severity
| where isnotnull(threat_type)
| lookup asset_lookup ip as src_ip OUTPUT asset_name, asset_owner, asset_priority
| eval urgency=case(
ti_severity=="critical" AND asset_priority=="critical", "critical",
ti_severity=="high" OR asset_priority=="critical", "high",
ti_severity=="medium", "medium",
true(), "low"
)
| eval description="来自 ".src_ip." (".asset_name.") 向已知恶意 IP ".dest_ip." (".threat_type.") 发起连接 - 来源:".ti_source
index=dns sourcetype=stream:dns query_type=A OR query_type=AAAA
| lookup domain_threat_intel_lookup domain as query OUTPUT threat_type as domain_threat, confidence as domain_confidence, source as ti_source
| where isnotnull(domain_threat) AND domain_confidence > 70
| stats count dc(src_ip) as unique_sources values(src_ip) as source_ips by query, domain_threat, ti_source
| eval severity=case(domain_confidence > 90, "critical", domain_confidence > 70, "high", true(), "medium")
| eval description="来自 ".unique_sources." 台主机的 DNS 查询指向恶意域名 ".query." - 威胁类型:".domain_threat
index=endpoint sourcetype=sysmon EventCode=1
| lookup file_hash_intel_lookup file_hash as Hashes OUTPUT malware_family, confidence as hash_confidence, source as ti_source
| where isnotnull(malware_family)
| stats count values(ParentCommandLine) as parent_commands by Computer, User, Image, malware_family, ti_source
| eval severity="critical"
| eval description="已知恶意软件 ".malware_family." 在 ".Computer." 上由 ".User." 执行 - 二进制:".Image
index=firewall sourcetype=pan:traffic action=allowed
| eval indicators=mvappend(src_ip, dest_ip)
| mvexpand indicators
| lookup ip_threat_intel_lookup ip as indicators OUTPUT threat_type as ip_threat, confidence as ip_confidence, source as ip_ti_source
| lookup geo_ip_lookup ip as indicators OUTPUT country, city, latitude, longitude
| lookup whois_lookup ip as indicators OUTPUT org as ip_org, asn as ip_asn
| where isnotnull(ip_threat)
| stats count
values(ip_threat) as threat_types
values(ip_ti_source) as intel_sources
values(country) as countries
values(ip_org) as organizations
latest(_time) as last_seen
earliest(_time) as first_seen
by src_ip, dest_ip, dest_port
| eval enrichment_context="威胁:".mvjoin(threat_types, ", ")." | 地理位置:".mvjoin(countries, ", ")." | 机构:".mvjoin(organizations, ", ")
| inputlookup ip_threat_intel_lookup
| stats count by source, threat_type
| sort -count
| head 20
| inputlookup ip_threat_intel_lookup
| eval age_days=round((now() - strptime(last_seen, "%Y-%m-%dT%H:%M:%S")) / 86400, 0)
| stats count avg(age_days) as avg_age_days max(age_days) as max_age_days by source
| eval status=case(avg_age_days > 30, "过期", avg_age_days > 7, "老化中", true(), "新鲜")
testing
设计并执行社会工程学渗透测试,包括钓鱼、语音钓鱼、短信钓鱼和物理借口活动,以衡量人员安全韧性并识别培训差距。
testing
主持结构化的事件后审查,以识别根本原因、记录有效和无效的措施,并提出可操作的改进建议以提升未来的事件响应能力。
testing
通过分析举报的邮件、提取指标、评估凭据受攻陷情况、在全组织范围隔离恶意邮件并修复受影响账号来响应网络钓鱼事件。涵盖邮件头分析、URL/附件沙箱检测和邮箱范围清除操作。适用于网络钓鱼响应、邮件事件、凭据钓鱼、鱼叉式网络钓鱼调查或钓鱼修复相关请求。
tools
票据传递(Pass-the-Ticket,PtT)是一种横向移动技术,使用窃取的 Kerberos 票据(TGT 或 TGS)在不知道用户密码的情况下向服务进行认证。通过从已控制的主机内存中提取 Kerberos 票据,攻击者可以将这些票据注入自己的会话以模拟票据所有者。