external/anthropic-cybersecurity-skills/skills/analyzing-network-covert-channels-in-malware/SKILL.md
Detect and analyze covert communication channels used by malware including DNS tunneling, ICMP exfiltration, steganographic HTTP, and protocol abuse for C2 and data exfiltration.
npx skillsauth add seikaikyo/dash-skills analyzing-network-covert-channels-in-malwareInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Malware uses covert channels to disguise C2 communication and data exfiltration within legitimate-looking network traffic. DNS tunneling encodes data in DNS queries and responses (used by tools like iodine, dnscat2, and malware families like FrameworkPOS). ICMP tunneling hides data in echo request/reply payloads (icmpsh, ptunnel). HTTP covert channels embed C2 data in headers, cookies, or steganographic images. Protocol abuse exploits allowed protocols to bypass firewalls. DNS tunneling detection achieves 99%+ recall with modern ML-based approaches, though low-throughput exfiltration remains challenging. Palo Alto Unit42 tracked three major DNS tunneling campaigns (TrkCdn, SecShow, Savvy Seahorse) through 2024, showing the technique's continued prevalence.
scapy, dpkt, dnslib#!/usr/bin/env python3
"""Detect DNS tunneling and covert channels in network traffic."""
import sys
import json
import math
from collections import Counter, defaultdict
try:
from scapy.all import rdpcap, DNS, DNSQR, DNSRR, IP, ICMP
except ImportError:
print("pip install scapy")
sys.exit(1)
def entropy(data):
if not data:
return 0
freq = Counter(data)
length = len(data)
return -sum((c/length) * math.log2(c/length) for c in freq.values())
def analyze_dns_tunneling(pcap_path):
"""Detect DNS tunneling indicators in PCAP."""
packets = rdpcap(pcap_path)
domain_stats = defaultdict(lambda: {
"queries": 0, "total_qname_len": 0, "subdomain_lengths": [],
"query_types": Counter(), "unique_subdomains": set(),
})
for pkt in packets:
if pkt.haslayer(DNS) and pkt.haslayer(DNSQR):
qname = pkt[DNSQR].qname.decode('utf-8', errors='replace').rstrip('.')
qtype = pkt[DNSQR].qtype
parts = qname.split('.')
if len(parts) >= 3:
base_domain = '.'.join(parts[-2:])
subdomain = '.'.join(parts[:-2])
stats = domain_stats[base_domain]
stats["queries"] += 1
stats["total_qname_len"] += len(qname)
stats["subdomain_lengths"].append(len(subdomain))
stats["query_types"][qtype] += 1
stats["unique_subdomains"].add(subdomain)
# Score domains for tunneling indicators
suspicious = []
for domain, stats in domain_stats.items():
if stats["queries"] < 5:
continue
avg_subdomain_len = (sum(stats["subdomain_lengths"]) /
len(stats["subdomain_lengths"]))
unique_ratio = len(stats["unique_subdomains"]) / stats["queries"]
# Calculate subdomain entropy
all_subdomains = ''.join(stats["unique_subdomains"])
sub_entropy = entropy(all_subdomains)
score = 0
reasons = []
if avg_subdomain_len > 30:
score += 30
reasons.append(f"Long subdomains (avg {avg_subdomain_len:.0f} chars)")
if unique_ratio > 0.9:
score += 25
reasons.append(f"High uniqueness ({unique_ratio:.2%})")
if sub_entropy > 4.0:
score += 25
reasons.append(f"High entropy ({sub_entropy:.2f})")
if stats["query_types"].get(16, 0) > 10: # TXT records
score += 20
reasons.append(f"Many TXT queries ({stats['query_types'][16]})")
if score >= 50:
suspicious.append({
"domain": domain,
"score": score,
"queries": stats["queries"],
"avg_subdomain_length": round(avg_subdomain_len, 1),
"unique_subdomains": len(stats["unique_subdomains"]),
"subdomain_entropy": round(sub_entropy, 2),
"reasons": reasons,
})
return sorted(suspicious, key=lambda x: -x["score"])
def analyze_icmp_tunneling(pcap_path):
"""Detect ICMP tunneling in PCAP."""
packets = rdpcap(pcap_path)
icmp_stats = defaultdict(lambda: {"count": 0, "payload_sizes": [], "payloads": []})
for pkt in packets:
if pkt.haslayer(ICMP) and pkt.haslayer(IP):
src = pkt[IP].src
dst = pkt[IP].dst
key = f"{src}->{dst}"
payload = bytes(pkt[ICMP].payload)
icmp_stats[key]["count"] += 1
icmp_stats[key]["payload_sizes"].append(len(payload))
if len(payload) > 64:
icmp_stats[key]["payloads"].append(payload[:100])
suspicious = []
for flow, stats in icmp_stats.items():
if stats["count"] < 5:
continue
avg_size = sum(stats["payload_sizes"]) / len(stats["payload_sizes"])
if avg_size > 64 or stats["count"] > 100:
suspicious.append({
"flow": flow,
"packets": stats["count"],
"avg_payload_size": round(avg_size, 1),
"reason": "Large/frequent ICMP payloads suggest tunneling",
})
return suspicious
if __name__ == "__main__":
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} <pcap_file>")
sys.exit(1)
print("[+] DNS Tunneling Analysis")
dns_results = analyze_dns_tunneling(sys.argv[1])
for r in dns_results:
print(f" {r['domain']} (score: {r['score']})")
for reason in r['reasons']:
print(f" - {reason}")
print("\n[+] ICMP Tunneling Analysis")
icmp_results = analyze_icmp_tunneling(sys.argv[1])
for r in icmp_results:
print(f" {r['flow']}: {r['reason']}")
development
Automates SOC 2 Type II audit preparation including gap assessment against AICPA Trust Services Criteria (CC1-CC9), evidence collection from cloud providers and identity systems, control testing validation, remediation tracking, and continuous compliance monitoring. Covers all five TSC categories (Security, Availability, Processing Integrity, Confidentiality, Privacy) with automated evidence gathering from AWS, Azure, GCP, Okta, GitHub, and Jira. Use when preparing for or maintaining SOC 2 Type II certification.
testing
Performs tabletop exercises for SOC teams simulating security incidents through discussion-based scenarios to test incident response procedures, communication workflows, and decision-making under pressure without impacting production systems. Use when organizations need to validate IR playbooks, train analysts, or meet compliance requirements for incident response testing.
development
Perform security testing of SOAP web services by analyzing WSDL definitions and testing for XML injection, XXE, WS-Security bypass, and SOAPAction spoofing.
devops
Automate credential rotation for service accounts across Active Directory, cloud platforms, and application databases to eliminate stale secrets and reduce compromise risk.