Agent skill

analyzing-network-covert-channels-in-malware

Detect and analyze covert communication channels used by malware including DNS tunneling, ICMP exfiltration, steganographic HTTP, and protocol abuse for C2 and data exfiltration.

Stars 4,300
Forks 470

Install this agent skill to your Project

npx add-skill https://github.com/mukul975/Anthropic-Cybersecurity-Skills/tree/main/skills/analyzing-network-covert-channels-in-malware

SKILL.md

Analyzing Network Covert Channels in Malware

Overview

Malware uses covert channels to disguise C2 communication and data exfiltration within legitimate-looking network traffic. DNS tunneling encodes data in DNS queries and responses (used by tools like iodine, dnscat2, and malware families like FrameworkPOS). ICMP tunneling hides data in echo request/reply payloads (icmpsh, ptunnel). HTTP covert channels embed C2 data in headers, cookies, or steganographic images. Protocol abuse exploits allowed protocols to bypass firewalls. DNS tunneling detection achieves 99%+ recall with modern ML-based approaches, though low-throughput exfiltration remains challenging. Palo Alto Unit42 tracked three major DNS tunneling campaigns (TrkCdn, SecShow, Savvy Seahorse) through 2024, showing the technique's continued prevalence.

When to Use

  • When investigating security incidents that require analyzing network covert channels in malware
  • When building detection rules or threat hunting queries for this domain
  • When SOC analysts need structured procedures for this analysis type
  • When validating security monitoring coverage for related attack techniques

Prerequisites

  • Python 3.9+ with scapy, dpkt, dnslib
  • Wireshark/tshark for PCAP analysis
  • Zeek (formerly Bro) for network monitoring
  • DNS query logging infrastructure
  • Understanding of DNS, ICMP, HTTP protocols at packet level

Workflow

Step 1: DNS Tunneling Detection

python
#!/usr/bin/env python3
"""Detect DNS tunneling and covert channels in network traffic."""
import sys
import json
import math
from collections import Counter, defaultdict

try:
    from scapy.all import rdpcap, DNS, DNSQR, DNSRR, IP, ICMP
except ImportError:
    print("pip install scapy")
    sys.exit(1)


def entropy(data):
    if not data:
        return 0
    freq = Counter(data)
    length = len(data)
    return -sum((c/length) * math.log2(c/length) for c in freq.values())


def analyze_dns_tunneling(pcap_path):
    """Detect DNS tunneling indicators in PCAP."""
    packets = rdpcap(pcap_path)
    domain_stats = defaultdict(lambda: {
        "queries": 0, "total_qname_len": 0, "subdomain_lengths": [],
        "query_types": Counter(), "unique_subdomains": set(),
    })

    for pkt in packets:
        if pkt.haslayer(DNS) and pkt.haslayer(DNSQR):
            qname = pkt[DNSQR].qname.decode('utf-8', errors='replace').rstrip('.')
            qtype = pkt[DNSQR].qtype

            parts = qname.split('.')
            if len(parts) >= 3:
                base_domain = '.'.join(parts[-2:])
                subdomain = '.'.join(parts[:-2])

                stats = domain_stats[base_domain]
                stats["queries"] += 1
                stats["total_qname_len"] += len(qname)
                stats["subdomain_lengths"].append(len(subdomain))
                stats["query_types"][qtype] += 1
                stats["unique_subdomains"].add(subdomain)

    # Score domains for tunneling indicators
    suspicious = []
    for domain, stats in domain_stats.items():
        if stats["queries"] < 5:
            continue

        avg_subdomain_len = (sum(stats["subdomain_lengths"]) /
                             len(stats["subdomain_lengths"]))
        unique_ratio = len(stats["unique_subdomains"]) / stats["queries"]

        # Calculate subdomain entropy
        all_subdomains = ''.join(stats["unique_subdomains"])
        sub_entropy = entropy(all_subdomains)

        score = 0
        reasons = []

        if avg_subdomain_len > 30:
            score += 30
            reasons.append(f"Long subdomains (avg {avg_subdomain_len:.0f} chars)")
        if unique_ratio > 0.9:
            score += 25
            reasons.append(f"High uniqueness ({unique_ratio:.2%})")
        if sub_entropy > 4.0:
            score += 25
            reasons.append(f"High entropy ({sub_entropy:.2f})")
        if stats["query_types"].get(16, 0) > 10:  # TXT records
            score += 20
            reasons.append(f"Many TXT queries ({stats['query_types'][16]})")

        if score >= 50:
            suspicious.append({
                "domain": domain,
                "score": score,
                "queries": stats["queries"],
                "avg_subdomain_length": round(avg_subdomain_len, 1),
                "unique_subdomains": len(stats["unique_subdomains"]),
                "subdomain_entropy": round(sub_entropy, 2),
                "reasons": reasons,
            })

    return sorted(suspicious, key=lambda x: -x["score"])


def analyze_icmp_tunneling(pcap_path):
    """Detect ICMP tunneling in PCAP."""
    packets = rdpcap(pcap_path)
    icmp_stats = defaultdict(lambda: {"count": 0, "payload_sizes": [], "payloads": []})

    for pkt in packets:
        if pkt.haslayer(ICMP) and pkt.haslayer(IP):
            src = pkt[IP].src
            dst = pkt[IP].dst
            key = f"{src}->{dst}"

            payload = bytes(pkt[ICMP].payload)
            icmp_stats[key]["count"] += 1
            icmp_stats[key]["payload_sizes"].append(len(payload))
            if len(payload) > 64:
                icmp_stats[key]["payloads"].append(payload[:100])

    suspicious = []
    for flow, stats in icmp_stats.items():
        if stats["count"] < 5:
            continue
        avg_size = sum(stats["payload_sizes"]) / len(stats["payload_sizes"])
        if avg_size > 64 or stats["count"] > 100:
            suspicious.append({
                "flow": flow,
                "packets": stats["count"],
                "avg_payload_size": round(avg_size, 1),
                "reason": "Large/frequent ICMP payloads suggest tunneling",
            })

    return suspicious


if __name__ == "__main__":
    if len(sys.argv) < 2:
        print(f"Usage: {sys.argv[0]} <pcap_file>")
        sys.exit(1)

    print("[+] DNS Tunneling Analysis")
    dns_results = analyze_dns_tunneling(sys.argv[1])
    for r in dns_results:
        print(f"  {r['domain']} (score: {r['score']})")
        for reason in r['reasons']:
            print(f"    - {reason}")

    print("\n[+] ICMP Tunneling Analysis")
    icmp_results = analyze_icmp_tunneling(sys.argv[1])
    for r in icmp_results:
        print(f"  {r['flow']}: {r['reason']}")

Validation Criteria

  • DNS tunneling detected via entropy, subdomain length, and query volume analysis
  • ICMP covert channels identified through payload size anomalies
  • Tunneling domains distinguished from legitimate CDN/cloud traffic
  • Data exfiltration volume estimated from captured traffic
  • C2 communication patterns and beaconing intervals extracted

References

Expand your agent's capabilities with these related and highly-rated skills.

mukul975/Anthropic-Cybersecurity-Skills

mapping-mitre-attack-techniques

Maps observed adversary behaviors, security alerts, and detection rules to MITRE ATT&CK techniques and sub-techniques to quantify detection coverage and guide control prioritization. Use when building an ATT&CK-based coverage heatmap, tagging SIEM alerts with technique IDs, aligning security controls to adversary playbooks, or reporting threat exposure to executives. Activates for requests involving ATT&CK Navigator, Sigma rules, MITRE D3FEND, or coverage gap analysis.

4,300 470
Explore
mukul975/Anthropic-Cybersecurity-Skills

hunting-for-spearphishing-indicators

Hunt for spearphishing campaign indicators across email logs, endpoint telemetry, and network data to detect targeted email attacks.

4,300 470
Explore
mukul975/Anthropic-Cybersecurity-Skills

analyzing-malicious-url-with-urlscan

URLScan.io is a free service for scanning and analyzing suspicious URLs. It captures screenshots, DOM content, HTTP transactions, JavaScript behavior, and network connections of web pages in an isolat

4,300 470
Explore
mukul975/Anthropic-Cybersecurity-Skills

implementing-zero-standing-privilege-with-cyberark

Deploy CyberArk Secure Cloud Access to eliminate standing privileges in hybrid and multi-cloud environments using just-in-time access with time, entitlement, and approval controls.

4,300 470
Explore
mukul975/Anthropic-Cybersecurity-Skills

implementing-pam-for-database-access

Deploy privileged access management for database systems including Oracle, SQL Server, PostgreSQL, and MySQL. Covers session proxy configuration, credential vaulting, query auditing, dynamic credentia

4,300 470
Explore
mukul975/Anthropic-Cybersecurity-Skills

detecting-t1003-credential-dumping-with-edr

Detect OS credential dumping techniques targeting LSASS memory, SAM database, NTDS.dit, and cached credentials using EDR telemetry, Sysmon process access monitoring, and Windows security event correlation.

4,300 470
Explore

Didn't find tool you were looking for?

Be as detailed as possible for better results