Agent skill

building-automated-malware-submission-pipeline

Builds an automated malware submission and analysis pipeline that collects suspicious files from endpoints and email gateways, submits them to sandbox environments and multi-engine scanners, and generates verdicts with IOCs for SIEM integration. Use when SOC teams need to scale malware analysis beyond manual sandbox submissions for high-volume alert triage.

Stars 4,300
Forks 470

Install this agent skill to your Project

npx add-skill https://github.com/mukul975/Anthropic-Cybersecurity-Skills/tree/main/skills/building-automated-malware-submission-pipeline

SKILL.md

Building Automated Malware Submission Pipeline

When to Use

Use this skill when:

  • SOC teams face high volume of suspicious file alerts requiring sandbox analysis
  • Manual sandbox submission creates bottlenecks in alert triage workflow
  • Endpoint and email security tools quarantine files needing automated verdict determination
  • Incident response requires rapid malware family identification and IOC extraction

Do not use for analyzing live malware samples in production environments — always use isolated sandbox infrastructure.

Prerequisites

  • Sandbox environment: Cuckoo Sandbox, Joe Sandbox, Any.Run, or VMRay
  • VirusTotal API key (Enterprise for submission, free for lookup)
  • MalwareBazaar API access for known malware lookup
  • File collection mechanism: EDR quarantine API, email gateway export, network capture
  • Python 3.8+ with requests, vt-py, pefile libraries
  • Isolated analysis network with no production connectivity

Workflow

Step 1: Build File Collection Pipeline

Collect suspicious files from multiple sources:

python
import requests
import hashlib
import os
from pathlib import Path
from datetime import datetime

class MalwareCollector:
    def __init__(self, quarantine_dir="/opt/malware_quarantine"):
        self.quarantine_dir = Path(quarantine_dir)
        self.quarantine_dir.mkdir(exist_ok=True)

    def collect_from_edr(self, edr_api_url, api_token):
        """Pull quarantined files from CrowdStrike Falcon"""
        headers = {"Authorization": f"Bearer {api_token}"}

        # Get recent quarantine events
        response = requests.get(
            f"{edr_api_url}/quarantine/queries/quarantined-files/v1",
            headers=headers,
            params={"filter": "state:'quarantined'", "limit": 50}
        )
        file_ids = response.json()["resources"]

        for file_id in file_ids:
            # Download quarantined file
            dl_response = requests.get(
                f"{edr_api_url}/quarantine/entities/quarantined-files/v1",
                headers=headers,
                params={"ids": file_id}
            )
            file_data = dl_response.content
            sha256 = hashlib.sha256(file_data).hexdigest()

            filepath = self.quarantine_dir / f"{sha256}.sample"
            filepath.write_bytes(file_data)
            yield {"sha256": sha256, "path": str(filepath), "source": "edr"}

    def collect_from_email_gateway(self, smtp_quarantine_path):
        """Pull attachments from email gateway quarantine"""
        import email
        from email import policy

        for eml_file in Path(smtp_quarantine_path).glob("*.eml"):
            msg = email.message_from_binary_file(
                eml_file.open("rb"), policy=policy.default
            )
            for attachment in msg.iter_attachments():
                content = attachment.get_content()
                if isinstance(content, str):
                    content = content.encode()
                sha256 = hashlib.sha256(content).hexdigest()
                filename = attachment.get_filename() or "unknown"

                filepath = self.quarantine_dir / f"{sha256}.sample"
                filepath.write_bytes(content)
                yield {
                    "sha256": sha256,
                    "path": str(filepath),
                    "source": "email",
                    "original_filename": filename,
                    "sender": msg["From"],
                    "subject": msg["Subject"]
                }

    def compute_hashes(self, filepath):
        """Calculate MD5, SHA1, SHA256 for a file"""
        with open(filepath, "rb") as f:
            content = f.read()
        return {
            "md5": hashlib.md5(content).hexdigest(),
            "sha1": hashlib.sha1(content).hexdigest(),
            "sha256": hashlib.sha256(content).hexdigest(),
            "size": len(content)
        }

Step 2: Pre-Screen with Hash Lookups

Check if the file is already known before sandbox submission:

python
import vt

class MalwarePreScreener:
    def __init__(self, vt_api_key, mb_api_url="https://mb-api.abuse.ch/api/v1/"):
        self.vt_client = vt.Client(vt_api_key)
        self.mb_api_url = mb_api_url

    def check_virustotal(self, sha256):
        """Lookup hash in VirusTotal"""
        try:
            file_obj = self.vt_client.get_object(f"/files/{sha256}")
            stats = file_obj.last_analysis_stats
            return {
                "found": True,
                "malicious": stats.get("malicious", 0),
                "suspicious": stats.get("suspicious", 0),
                "undetected": stats.get("undetected", 0),
                "total": sum(stats.values()),
                "threat_label": getattr(file_obj, "popular_threat_classification", {}).get(
                    "suggested_threat_label", "Unknown"
                ),
                "type": getattr(file_obj, "type_description", "Unknown")
            }
        except vt.APIError:
            return {"found": False}

    def check_malwarebazaar(self, sha256):
        """Lookup hash in MalwareBazaar"""
        response = requests.post(
            self.mb_api_url,
            data={"query": "get_info", "hash": sha256}
        )
        data = response.json()
        if data["query_status"] == "ok":
            entry = data["data"][0]
            return {
                "found": True,
                "signature": entry.get("signature", "Unknown"),
                "tags": entry.get("tags", []),
                "file_type": entry.get("file_type", "Unknown"),
                "first_seen": entry.get("first_seen", "Unknown")
            }
        return {"found": False}

    def pre_screen(self, sha256):
        """Run all pre-screening checks"""
        vt_result = self.check_virustotal(sha256)
        mb_result = self.check_malwarebazaar(sha256)

        verdict = "UNKNOWN"
        if vt_result["found"] and vt_result.get("malicious", 0) > 10:
            verdict = "KNOWN_MALICIOUS"
        elif vt_result["found"] and vt_result.get("malicious", 0) == 0:
            verdict = "LIKELY_CLEAN"

        return {
            "sha256": sha256,
            "virustotal": vt_result,
            "malwarebazaar": mb_result,
            "pre_screen_verdict": verdict,
            "needs_sandbox": verdict == "UNKNOWN"
        }

    def close(self):
        self.vt_client.close()

Step 3: Submit to Sandbox for Dynamic Analysis

Cuckoo Sandbox Submission:

python
class SandboxSubmitter:
    def __init__(self, cuckoo_url="http://cuckoo.internal:8090"):
        self.cuckoo_url = cuckoo_url

    def submit_to_cuckoo(self, filepath, timeout=300):
        """Submit file to Cuckoo Sandbox"""
        with open(filepath, "rb") as f:
            response = requests.post(
                f"{self.cuckoo_url}/tasks/create/file",
                files={"file": f},
                data={
                    "timeout": timeout,
                    "options": "procmemdump=yes,route=none",
                    "priority": 2,
                    "machine": "win10_x64"
                }
            )
        task_id = response.json()["task_id"]
        return task_id

    def wait_for_analysis(self, task_id, poll_interval=30, max_wait=600):
        """Wait for sandbox analysis to complete"""
        import time
        elapsed = 0
        while elapsed < max_wait:
            response = requests.get(f"{self.cuckoo_url}/tasks/view/{task_id}")
            status = response.json()["task"]["status"]
            if status == "reported":
                return self.get_report(task_id)
            elif status == "failed_analysis":
                return {"error": "Analysis failed"}
            time.sleep(poll_interval)
            elapsed += poll_interval
        return {"error": "Analysis timed out"}

    def get_report(self, task_id):
        """Retrieve analysis report"""
        response = requests.get(f"{self.cuckoo_url}/tasks/report/{task_id}")
        report = response.json()

        # Extract key indicators
        return {
            "task_id": task_id,
            "score": report.get("info", {}).get("score", 0),
            "signatures": [
                {"name": s["name"], "severity": s["severity"], "description": s["description"]}
                for s in report.get("signatures", [])
            ],
            "network": {
                "dns": [d["request"] for d in report.get("network", {}).get("dns", [])],
                "http": [
                    {"url": h["uri"], "method": h["method"]}
                    for h in report.get("network", {}).get("http", [])
                ],
                "hosts": report.get("network", {}).get("hosts", [])
            },
            "dropped_files": [
                {"name": f["name"], "sha256": f["sha256"], "size": f["size"]}
                for f in report.get("dropped", [])
            ],
            "processes": [
                {"name": p["process_name"], "pid": p["pid"], "command_line": p.get("command_line", "")}
                for p in report.get("behavior", {}).get("processes", [])
            ],
            "registry_keys": [
                k for k in report.get("behavior", {}).get("summary", {}).get("regkey_written", [])
            ]
        }

    def submit_to_joesandbox(self, filepath, joe_api_key, joe_url="https://jbxcloud.joesecurity.org/api"):
        """Submit to Joe Sandbox Cloud"""
        with open(filepath, "rb") as f:
            response = requests.post(
                f"{joe_url}/v2/submission/new",
                headers={"Authorization": f"Bearer {joe_api_key}"},
                files={"sample": f},
                data={
                    "systems": "w10_64",
                    "internet-access": False,
                    "report-cache": True
                }
            )
        return response.json()["data"]["webid"]

Step 4: Extract IOCs and Generate Verdict

python
class VerdictGenerator:
    def __init__(self):
        self.malicious_threshold = 7  # Cuckoo score threshold

    def generate_verdict(self, pre_screen, sandbox_report):
        """Combine pre-screening and sandbox results for final verdict"""
        iocs = {
            "ips": [],
            "domains": [],
            "urls": [],
            "hashes": [],
            "registry_keys": [],
            "files_dropped": []
        }

        # Extract IOCs from sandbox report
        if sandbox_report:
            iocs["domains"] = sandbox_report.get("network", {}).get("dns", [])
            iocs["ips"] = sandbox_report.get("network", {}).get("hosts", [])
            iocs["urls"] = [
                h["url"] for h in sandbox_report.get("network", {}).get("http", [])
            ]
            iocs["hashes"] = [
                f["sha256"] for f in sandbox_report.get("dropped_files", [])
            ]
            iocs["registry_keys"] = sandbox_report.get("registry_keys", [])[:10]
            iocs["files_dropped"] = sandbox_report.get("dropped_files", [])

        # Determine verdict
        vt_malicious = pre_screen.get("virustotal", {}).get("malicious", 0)
        sandbox_score = sandbox_report.get("score", 0) if sandbox_report else 0
        sig_count = len(sandbox_report.get("signatures", [])) if sandbox_report else 0

        combined_score = (vt_malicious * 2) + (sandbox_score * 10) + (sig_count * 5)

        if combined_score >= 100:
            verdict = "MALICIOUS"
            confidence = "HIGH"
        elif combined_score >= 50:
            verdict = "SUSPICIOUS"
            confidence = "MEDIUM"
        elif combined_score >= 20:
            verdict = "POTENTIALLY_UNWANTED"
            confidence = "LOW"
        else:
            verdict = "CLEAN"
            confidence = "HIGH"

        return {
            "verdict": verdict,
            "confidence": confidence,
            "combined_score": combined_score,
            "iocs": iocs,
            "vt_detections": vt_malicious,
            "sandbox_score": sandbox_score,
            "signatures": sandbox_report.get("signatures", []) if sandbox_report else []
        }

Step 5: Push Results to SIEM

python
def push_to_splunk(verdict_result, splunk_url, splunk_token):
    """Send malware analysis verdict to Splunk HEC"""
    import json

    event = {
        "sourcetype": "malware_analysis",
        "source": "malware_pipeline",
        "event": {
            "sha256": verdict_result["sha256"],
            "verdict": verdict_result["verdict"],
            "confidence": verdict_result["confidence"],
            "score": verdict_result["combined_score"],
            "vt_detections": verdict_result["vt_detections"],
            "sandbox_score": verdict_result["sandbox_score"],
            "malware_family": verdict_result.get("threat_label", "Unknown"),
            "iocs": verdict_result["iocs"],
            "signatures": [s["name"] for s in verdict_result["signatures"]]
        }
    }

    response = requests.post(
        f"{splunk_url}/services/collector/event",
        headers={
            "Authorization": f"Splunk {splunk_token}",
            "Content-Type": "application/json"
        },
        json=event,
        verify=not os.environ.get("SKIP_TLS_VERIFY", "").lower() == "true",  # Set SKIP_TLS_VERIFY=true for self-signed certs in lab environments
    )
    return response.status_code == 200

def push_iocs_to_blocklist(iocs, firewall_api):
    """Push extracted IOCs to blocking infrastructure"""
    for ip in iocs.get("ips", []):
        requests.post(
            f"{firewall_api}/block",
            json={"type": "ip", "value": ip, "action": "block", "source": "malware_pipeline"}
        )
    for domain in iocs.get("domains", []):
        requests.post(
            f"{firewall_api}/block",
            json={"type": "domain", "value": domain, "action": "sinkhole", "source": "malware_pipeline"}
        )

Step 6: Orchestrate the Full Pipeline

python
def run_malware_pipeline(sample_path, config):
    """Execute full malware analysis pipeline"""
    collector = MalwareCollector()
    screener = MalwarePreScreener(config["vt_key"])
    submitter = SandboxSubmitter(config["cuckoo_url"])
    generator = VerdictGenerator()

    # Step 1: Hash and pre-screen
    hashes = collector.compute_hashes(sample_path)
    pre_screen = screener.pre_screen(hashes["sha256"])

    # Step 2: Submit to sandbox if unknown
    sandbox_report = None
    if pre_screen["needs_sandbox"]:
        task_id = submitter.submit_to_cuckoo(sample_path)
        sandbox_report = submitter.wait_for_analysis(task_id)

    # Step 3: Generate verdict
    verdict = generator.generate_verdict(pre_screen, sandbox_report)
    verdict["sha256"] = hashes["sha256"]
    verdict["threat_label"] = pre_screen.get("virustotal", {}).get("threat_label", "Unknown")

    # Step 4: Push to SIEM
    push_to_splunk(verdict, config["splunk_url"], config["splunk_token"])

    # Step 5: Block if malicious
    if verdict["verdict"] == "MALICIOUS":
        push_iocs_to_blocklist(verdict["iocs"], config["firewall_api"])

    screener.close()
    return verdict

Key Concepts

Term Definition
Dynamic Analysis Executing malware in a sandbox to observe runtime behavior (process creation, network, file system changes)
Static Analysis Examining malware without execution (hash lookup, string analysis, PE header inspection)
Sandbox Evasion Techniques malware uses to detect sandbox environments and alter behavior to avoid analysis
IOC Extraction Automated process of identifying network indicators, file artifacts, and registry changes from sandbox reports
Multi-AV Scanning Submitting samples to multiple antivirus engines (VirusTotal) for consensus-based detection
Verdict Final classification of a sample: Malicious, Suspicious, Potentially Unwanted, or Clean

Tools & Systems

  • Cuckoo Sandbox: Open-source automated malware analysis platform with behavioral analysis and network capture
  • Joe Sandbox: Commercial sandbox with deep behavioral analysis, YARA matching, and MITRE ATT&CK mapping
  • Any.Run: Interactive sandbox service allowing real-time manipulation during analysis for debugging evasive malware
  • VirusTotal: Multi-engine scanning service providing 70+ AV results and behavioral analysis reports
  • CAPE Sandbox: Community-maintained Cuckoo fork with enhanced payload extraction and configuration dumping

Common Scenarios

  • Email Attachment Triage: Auto-submit quarantined email attachments, generate verdict in <5 minutes
  • EDR Quarantine Processing: Batch-process files quarantined by endpoint security for detailed analysis
  • Incident Investigation: Submit suspicious binaries found during IR for malware family identification and IOC extraction
  • Threat Intel Enrichment: Analyze samples from threat feeds to extract C2 infrastructure and update blocking
  • Zero-Day Detection: Sandbox catches novel malware missed by signature-based AV through behavioral analysis

Output Format

MALWARE ANALYSIS REPORT — Pipeline Submission
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Sample:       invoice_march.docx
SHA256:       a1b2c3d4e5f6a7b8...
File Type:    Microsoft Word Document (macro-enabled)

Pre-Screening:
  VirusTotal:    34/72 malicious (Emotet.Downloader)
  MalwareBazaar: Tags: emotet, macro, downloader

Sandbox Analysis (Cuckoo):
  Score:         9.2/10 (MALICIOUS)
  Signatures:
    - Macro executes PowerShell download cradle (severity: 8)
    - Process injection into explorer.exe (severity: 9)
    - Connects to known Emotet C2 server (severity: 9)

Extracted IOCs:
  C2 IPs:       185.234.218[.]50:8080, 45.77.123[.]45:443
  Domains:       update-service[.]evil[.]com
  Dropped Files: payload.dll (SHA256: b2c3d4e5...)
  Registry:      HKCU\Software\Microsoft\Windows\CurrentVersion\Run\Update

VERDICT: MALICIOUS (Emotet Downloader) — Confidence: HIGH
ACTIONS:
  [DONE] IOCs pushed to Splunk threat intel
  [DONE] C2 IPs blocked on firewall
  [DONE] Domain sinkholed on DNS
  [DONE] Hash blocked on endpoint

Expand your agent's capabilities with these related and highly-rated skills.

mukul975/Anthropic-Cybersecurity-Skills

mapping-mitre-attack-techniques

Maps observed adversary behaviors, security alerts, and detection rules to MITRE ATT&CK techniques and sub-techniques to quantify detection coverage and guide control prioritization. Use when building an ATT&CK-based coverage heatmap, tagging SIEM alerts with technique IDs, aligning security controls to adversary playbooks, or reporting threat exposure to executives. Activates for requests involving ATT&CK Navigator, Sigma rules, MITRE D3FEND, or coverage gap analysis.

4,300 470
Explore
mukul975/Anthropic-Cybersecurity-Skills

hunting-for-spearphishing-indicators

Hunt for spearphishing campaign indicators across email logs, endpoint telemetry, and network data to detect targeted email attacks.

4,300 470
Explore
mukul975/Anthropic-Cybersecurity-Skills

analyzing-malicious-url-with-urlscan

URLScan.io is a free service for scanning and analyzing suspicious URLs. It captures screenshots, DOM content, HTTP transactions, JavaScript behavior, and network connections of web pages in an isolat

4,300 470
Explore
mukul975/Anthropic-Cybersecurity-Skills

implementing-zero-standing-privilege-with-cyberark

Deploy CyberArk Secure Cloud Access to eliminate standing privileges in hybrid and multi-cloud environments using just-in-time access with time, entitlement, and approval controls.

4,300 470
Explore
mukul975/Anthropic-Cybersecurity-Skills

implementing-pam-for-database-access

Deploy privileged access management for database systems including Oracle, SQL Server, PostgreSQL, and MySQL. Covers session proxy configuration, credential vaulting, query auditing, dynamic credentia

4,300 470
Explore
mukul975/Anthropic-Cybersecurity-Skills

detecting-t1003-credential-dumping-with-edr

Detect OS credential dumping techniques targeting LSASS memory, SAM database, NTDS.dit, and cached credentials using EDR telemetry, Sysmon process access monitoring, and Windows security event correlation.

4,300 470
Explore

Didn't find tool you were looking for?

Be as detailed as possible for better results