skills/building-automated-malware-submission-pipeline/SKILL.md
Builds an automated malware submission and analysis pipeline that collects suspicious files from endpoints and email gateways, submits them to sandbox environments and multi-engine scanners, and generates verdicts with IOCs for SIEM integration. Use when SOC teams need to scale malware analysis beyond manual sandbox submissions for high-volume alert triage.
npx skillsauth add mukul975/anthropic-cybersecurity-skills building-automated-malware-submission-pipelineInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Use this skill when:
Do not use for analyzing live malware samples in production environments — always use isolated sandbox infrastructure.
requests, vt-py, pefile librariesCollect suspicious files from multiple sources:
import requests
import hashlib
import os
from pathlib import Path
from datetime import datetime
class MalwareCollector:
def __init__(self, quarantine_dir="/opt/malware_quarantine"):
self.quarantine_dir = Path(quarantine_dir)
self.quarantine_dir.mkdir(exist_ok=True)
def collect_from_edr(self, edr_api_url, api_token):
"""Pull quarantined files from CrowdStrike Falcon"""
headers = {"Authorization": f"Bearer {api_token}"}
# Get recent quarantine events
response = requests.get(
f"{edr_api_url}/quarantine/queries/quarantined-files/v1",
headers=headers,
params={"filter": "state:'quarantined'", "limit": 50}
)
file_ids = response.json()["resources"]
for file_id in file_ids:
# Download quarantined file
dl_response = requests.get(
f"{edr_api_url}/quarantine/entities/quarantined-files/v1",
headers=headers,
params={"ids": file_id}
)
file_data = dl_response.content
sha256 = hashlib.sha256(file_data).hexdigest()
filepath = self.quarantine_dir / f"{sha256}.sample"
filepath.write_bytes(file_data)
yield {"sha256": sha256, "path": str(filepath), "source": "edr"}
def collect_from_email_gateway(self, smtp_quarantine_path):
"""Pull attachments from email gateway quarantine"""
import email
from email import policy
for eml_file in Path(smtp_quarantine_path).glob("*.eml"):
msg = email.message_from_binary_file(
eml_file.open("rb"), policy=policy.default
)
for attachment in msg.iter_attachments():
content = attachment.get_content()
if isinstance(content, str):
content = content.encode()
sha256 = hashlib.sha256(content).hexdigest()
filename = attachment.get_filename() or "unknown"
filepath = self.quarantine_dir / f"{sha256}.sample"
filepath.write_bytes(content)
yield {
"sha256": sha256,
"path": str(filepath),
"source": "email",
"original_filename": filename,
"sender": msg["From"],
"subject": msg["Subject"]
}
def compute_hashes(self, filepath):
"""Calculate MD5, SHA1, SHA256 for a file"""
with open(filepath, "rb") as f:
content = f.read()
return {
"md5": hashlib.md5(content).hexdigest(),
"sha1": hashlib.sha1(content).hexdigest(),
"sha256": hashlib.sha256(content).hexdigest(),
"size": len(content)
}
Check if the file is already known before sandbox submission:
import vt
class MalwarePreScreener:
def __init__(self, vt_api_key, mb_api_url="https://mb-api.abuse.ch/api/v1/"):
self.vt_client = vt.Client(vt_api_key)
self.mb_api_url = mb_api_url
def check_virustotal(self, sha256):
"""Lookup hash in VirusTotal"""
try:
file_obj = self.vt_client.get_object(f"/files/{sha256}")
stats = file_obj.last_analysis_stats
return {
"found": True,
"malicious": stats.get("malicious", 0),
"suspicious": stats.get("suspicious", 0),
"undetected": stats.get("undetected", 0),
"total": sum(stats.values()),
"threat_label": getattr(file_obj, "popular_threat_classification", {}).get(
"suggested_threat_label", "Unknown"
),
"type": getattr(file_obj, "type_description", "Unknown")
}
except vt.APIError:
return {"found": False}
def check_malwarebazaar(self, sha256):
"""Lookup hash in MalwareBazaar"""
response = requests.post(
self.mb_api_url,
data={"query": "get_info", "hash": sha256}
)
data = response.json()
if data["query_status"] == "ok":
entry = data["data"][0]
return {
"found": True,
"signature": entry.get("signature", "Unknown"),
"tags": entry.get("tags", []),
"file_type": entry.get("file_type", "Unknown"),
"first_seen": entry.get("first_seen", "Unknown")
}
return {"found": False}
def pre_screen(self, sha256):
"""Run all pre-screening checks"""
vt_result = self.check_virustotal(sha256)
mb_result = self.check_malwarebazaar(sha256)
verdict = "UNKNOWN"
if vt_result["found"] and vt_result.get("malicious", 0) > 10:
verdict = "KNOWN_MALICIOUS"
elif vt_result["found"] and vt_result.get("malicious", 0) == 0:
verdict = "LIKELY_CLEAN"
return {
"sha256": sha256,
"virustotal": vt_result,
"malwarebazaar": mb_result,
"pre_screen_verdict": verdict,
"needs_sandbox": verdict == "UNKNOWN"
}
def close(self):
self.vt_client.close()
Cuckoo Sandbox Submission:
class SandboxSubmitter:
def __init__(self, cuckoo_url="http://cuckoo.internal:8090"):
self.cuckoo_url = cuckoo_url
def submit_to_cuckoo(self, filepath, timeout=300):
"""Submit file to Cuckoo Sandbox"""
with open(filepath, "rb") as f:
response = requests.post(
f"{self.cuckoo_url}/tasks/create/file",
files={"file": f},
data={
"timeout": timeout,
"options": "procmemdump=yes,route=none",
"priority": 2,
"machine": "win10_x64"
}
)
task_id = response.json()["task_id"]
return task_id
def wait_for_analysis(self, task_id, poll_interval=30, max_wait=600):
"""Wait for sandbox analysis to complete"""
import time
elapsed = 0
while elapsed < max_wait:
response = requests.get(f"{self.cuckoo_url}/tasks/view/{task_id}")
status = response.json()["task"]["status"]
if status == "reported":
return self.get_report(task_id)
elif status == "failed_analysis":
return {"error": "Analysis failed"}
time.sleep(poll_interval)
elapsed += poll_interval
return {"error": "Analysis timed out"}
def get_report(self, task_id):
"""Retrieve analysis report"""
response = requests.get(f"{self.cuckoo_url}/tasks/report/{task_id}")
report = response.json()
# Extract key indicators
return {
"task_id": task_id,
"score": report.get("info", {}).get("score", 0),
"signatures": [
{"name": s["name"], "severity": s["severity"], "description": s["description"]}
for s in report.get("signatures", [])
],
"network": {
"dns": [d["request"] for d in report.get("network", {}).get("dns", [])],
"http": [
{"url": h["uri"], "method": h["method"]}
for h in report.get("network", {}).get("http", [])
],
"hosts": report.get("network", {}).get("hosts", [])
},
"dropped_files": [
{"name": f["name"], "sha256": f["sha256"], "size": f["size"]}
for f in report.get("dropped", [])
],
"processes": [
{"name": p["process_name"], "pid": p["pid"], "command_line": p.get("command_line", "")}
for p in report.get("behavior", {}).get("processes", [])
],
"registry_keys": [
k for k in report.get("behavior", {}).get("summary", {}).get("regkey_written", [])
]
}
def submit_to_joesandbox(self, filepath, joe_api_key, joe_url="https://jbxcloud.joesecurity.org/api"):
"""Submit to Joe Sandbox Cloud"""
with open(filepath, "rb") as f:
response = requests.post(
f"{joe_url}/v2/submission/new",
headers={"Authorization": f"Bearer {joe_api_key}"},
files={"sample": f},
data={
"systems": "w10_64",
"internet-access": False,
"report-cache": True
}
)
return response.json()["data"]["webid"]
class VerdictGenerator:
def __init__(self):
self.malicious_threshold = 7 # Cuckoo score threshold
def generate_verdict(self, pre_screen, sandbox_report):
"""Combine pre-screening and sandbox results for final verdict"""
iocs = {
"ips": [],
"domains": [],
"urls": [],
"hashes": [],
"registry_keys": [],
"files_dropped": []
}
# Extract IOCs from sandbox report
if sandbox_report:
iocs["domains"] = sandbox_report.get("network", {}).get("dns", [])
iocs["ips"] = sandbox_report.get("network", {}).get("hosts", [])
iocs["urls"] = [
h["url"] for h in sandbox_report.get("network", {}).get("http", [])
]
iocs["hashes"] = [
f["sha256"] for f in sandbox_report.get("dropped_files", [])
]
iocs["registry_keys"] = sandbox_report.get("registry_keys", [])[:10]
iocs["files_dropped"] = sandbox_report.get("dropped_files", [])
# Determine verdict
vt_malicious = pre_screen.get("virustotal", {}).get("malicious", 0)
sandbox_score = sandbox_report.get("score", 0) if sandbox_report else 0
sig_count = len(sandbox_report.get("signatures", [])) if sandbox_report else 0
combined_score = (vt_malicious * 2) + (sandbox_score * 10) + (sig_count * 5)
if combined_score >= 100:
verdict = "MALICIOUS"
confidence = "HIGH"
elif combined_score >= 50:
verdict = "SUSPICIOUS"
confidence = "MEDIUM"
elif combined_score >= 20:
verdict = "POTENTIALLY_UNWANTED"
confidence = "LOW"
else:
verdict = "CLEAN"
confidence = "HIGH"
return {
"verdict": verdict,
"confidence": confidence,
"combined_score": combined_score,
"iocs": iocs,
"vt_detections": vt_malicious,
"sandbox_score": sandbox_score,
"signatures": sandbox_report.get("signatures", []) if sandbox_report else []
}
def push_to_splunk(verdict_result, splunk_url, splunk_token):
"""Send malware analysis verdict to Splunk HEC"""
import json
event = {
"sourcetype": "malware_analysis",
"source": "malware_pipeline",
"event": {
"sha256": verdict_result["sha256"],
"verdict": verdict_result["verdict"],
"confidence": verdict_result["confidence"],
"score": verdict_result["combined_score"],
"vt_detections": verdict_result["vt_detections"],
"sandbox_score": verdict_result["sandbox_score"],
"malware_family": verdict_result.get("threat_label", "Unknown"),
"iocs": verdict_result["iocs"],
"signatures": [s["name"] for s in verdict_result["signatures"]]
}
}
response = requests.post(
f"{splunk_url}/services/collector/event",
headers={
"Authorization": f"Splunk {splunk_token}",
"Content-Type": "application/json"
},
json=event,
verify=not os.environ.get("SKIP_TLS_VERIFY", "").lower() == "true", # Set SKIP_TLS_VERIFY=true for self-signed certs in lab environments
)
return response.status_code == 200
def push_iocs_to_blocklist(iocs, firewall_api):
"""Push extracted IOCs to blocking infrastructure"""
for ip in iocs.get("ips", []):
requests.post(
f"{firewall_api}/block",
json={"type": "ip", "value": ip, "action": "block", "source": "malware_pipeline"}
)
for domain in iocs.get("domains", []):
requests.post(
f"{firewall_api}/block",
json={"type": "domain", "value": domain, "action": "sinkhole", "source": "malware_pipeline"}
)
def run_malware_pipeline(sample_path, config):
"""Execute full malware analysis pipeline"""
collector = MalwareCollector()
screener = MalwarePreScreener(config["vt_key"])
submitter = SandboxSubmitter(config["cuckoo_url"])
generator = VerdictGenerator()
# Step 1: Hash and pre-screen
hashes = collector.compute_hashes(sample_path)
pre_screen = screener.pre_screen(hashes["sha256"])
# Step 2: Submit to sandbox if unknown
sandbox_report = None
if pre_screen["needs_sandbox"]:
task_id = submitter.submit_to_cuckoo(sample_path)
sandbox_report = submitter.wait_for_analysis(task_id)
# Step 3: Generate verdict
verdict = generator.generate_verdict(pre_screen, sandbox_report)
verdict["sha256"] = hashes["sha256"]
verdict["threat_label"] = pre_screen.get("virustotal", {}).get("threat_label", "Unknown")
# Step 4: Push to SIEM
push_to_splunk(verdict, config["splunk_url"], config["splunk_token"])
# Step 5: Block if malicious
if verdict["verdict"] == "MALICIOUS":
push_iocs_to_blocklist(verdict["iocs"], config["firewall_api"])
screener.close()
return verdict
| Term | Definition | |------|-----------| | Dynamic Analysis | Executing malware in a sandbox to observe runtime behavior (process creation, network, file system changes) | | Static Analysis | Examining malware without execution (hash lookup, string analysis, PE header inspection) | | Sandbox Evasion | Techniques malware uses to detect sandbox environments and alter behavior to avoid analysis | | IOC Extraction | Automated process of identifying network indicators, file artifacts, and registry changes from sandbox reports | | Multi-AV Scanning | Submitting samples to multiple antivirus engines (VirusTotal) for consensus-based detection | | Verdict | Final classification of a sample: Malicious, Suspicious, Potentially Unwanted, or Clean |
MALWARE ANALYSIS REPORT — Pipeline Submission
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Sample: invoice_march.docx
SHA256: a1b2c3d4e5f6a7b8...
File Type: Microsoft Word Document (macro-enabled)
Pre-Screening:
VirusTotal: 34/72 malicious (Emotet.Downloader)
MalwareBazaar: Tags: emotet, macro, downloader
Sandbox Analysis (Cuckoo):
Score: 9.2/10 (MALICIOUS)
Signatures:
- Macro executes PowerShell download cradle (severity: 8)
- Process injection into explorer.exe (severity: 9)
- Connects to known Emotet C2 server (severity: 9)
Extracted IOCs:
C2 IPs: 185.234.218[.]50:8080, 45.77.123[.]45:443
Domains: update-service[.]evil[.]com
Dropped Files: payload.dll (SHA256: b2c3d4e5...)
Registry: HKCU\Software\Microsoft\Windows\CurrentVersion\Run\Update
VERDICT: MALICIOUS (Emotet Downloader) — Confidence: HIGH
ACTIONS:
[DONE] IOCs pushed to Splunk threat intel
[DONE] C2 IPs blocked on firewall
[DONE] Domain sinkholed on DNS
[DONE] Hash blocked on endpoint
development
MISP (Malware Information Sharing Platform) is an open-source threat intelligence platform for gathering, sharing, storing, and correlating Indicators of Compromise (IOCs) of targeted attacks, threat
tools
Collects and synthesizes open-source intelligence (OSINT) about threat actors, malicious infrastructure, and attack campaigns using publicly available data sources, passive reconnaissance tools, and dark web monitoring. Use when investigating external threat actor infrastructure, performing pre-engagement reconnaissance for authorized red team assessments, or enriching CTI reports with publicly available adversary context. Activates for requests involving Maltego, Shodan, OSINT framework, SpiderFoot, or infrastructure reconnaissance.
development
Systematically collects, categorizes, and distributes indicators of compromise (IOCs) during and after security incidents to enable detection, blocking, and threat intelligence sharing. Covers network, host, email, and behavioral indicators using STIX/TAXII formats and threat intelligence platforms. Activates for requests involving IOC collection, indicator extraction, threat indicator sharing, compromise indicators, STIX export, or IOC enrichment.
development
Discovering and accessing unprotected pages, APIs, and administrative interfaces by enumerating URLs and bypassing authentication controls during authorized security assessments.