Skip to Content

Zero to Hero: Building a Multi-Tier Cyber Security Operations Center with Google ADK

Hierarchical SOC AI Agent

Zero to Hero: Building a Multi-Tier Cyber Security Operations Center with Google ADK

Introduction

Welcome to the Hierarchical SOC Agent tutorial! Here, we break down how to migrate a multi-tiered Security Operations Center (SOC) simulation to Google's Agent Development Kit (ADK).

This implementation heavily leverages ADK's SequentialAgent and ParallelAgent orchestrators. It maps a traditional incident response team perfectly by ensuring the low-level tier "Doers" (like Log Parsers) pass synthesized context to the mid-tier "Managers", which execute in parallel workflows, who finally synthesize everything to the high-level "Executive" Director making critical business decisions.

Step 1: The Blueprint

Project Structure

We split our application up into elegant, manageable pieces. Here is what the folder structure looks like:


/hierarchical_aI_agents/
    ├── __init__.py                # 🚪 Required package declaration
    ├── agent.py                   # 🧠 The Logic: 3-Tier Multi-Agent Orchestration
    ├── tools.py                   # 🛠️ The Hooks: Fetch logs & scan endpoints
    ├── prompts.py                 # 🎭 The Instructions: Strict separation of concerns
    ├── standalone_soc_agent.py    # ⚙️ Testing CLI script
    ├── .env                       # 🔑 Keys pointing to Gemini 
    └── run_agent.sh               # 🚀 Web UI launcher
        
tools.py

Step 2: External Functions for Low-Level Agents

Our Tier 3 Doers require highly specific tooling restricted by the principle of least privilege.


# tools.py
import random

def read_server_logs(target_server: str) -> dict:
    """Read raw server logs for anomalies."""
    # ... implementation details
    return {"status": "anomaly_detected"}

def ping_endpoint(target_endpoint: str) -> dict:
    """Read-only vulnerability scan."""
    # ... implementation details 
    return {"status": "vulnerable"}

def execute_quarantine(ip_address: str) -> dict:
    """Update firewall rule to block an IP."""
    # ... implementation details
    return {"status": "success"}
        
agent.py

Step 3: Orchestrating the 3-Tier Hierarchy

We use chains of `SequentialAgent` constructs to build logical workflows, then execute those disparate workflows simultaneously with a `ParallelAgent`, and finally culminate in a master `SequentialAgent`.


# agent.py
from google.adk.agents import LlmAgent, ParallelAgent, SequentialAgent

# ... Agent instantiations omitted for brevity

# --- WORKFLOWS ---
# Incident workflow processes logs, acts, then synthesizes (Tier 3 -> Tier 2)
incident_team = SequentialAgent(
    name="incident_team",
    description="Handles log parsing and mitigation.",
    sub_agents=[log_parser, quarantine_executor, incident_commander]
)

# Threat workflow processes scans, then synthesizes (Tier 3 -> Tier 2)
threat_team = SequentialAgent(
    name="threat_team",
    description="Handles vulnerability scanning and assessment.",
    sub_agents=[vulnerability_scanner, threat_coordinator]
)

# Run mid-tier workflows in parallel
mid_tier = ParallelAgent(
    name="mid_tier_orchestrator",
    description="Runs the incident and threat workflows concurrently.",
    sub_agents=[incident_team, threat_team]
)

# --- MASTER ORCHESTRATOR ---
soc_agent = SequentialAgent(
    name="hierarchical_soc_agent",
    description="Automated Security Operations Center using a 3-tier hierarchical structure.",
    sub_agents=[mid_tier, director]
)
        
Why This Matters: By enforcing a strict hierarchy, we guarantee that the Tier 1 Director never has to read 10,000 lines of firewall logs. The context is distilled at each tier of the workflow seamlessly.
Reference

All-in-One Standalone Code

While the project is naturally split into modules for better maintainability, you can run the entire Hierarchical SOC Agent from a single file. Below is the combined code including Prompts, Tools, and the 3-Tier Agent Logic.

soc_all_in_one.py
import os
import random
import asyncio
from dotenv import load_dotenv
from google.adk.agents import LlmAgent, ParallelAgent, SequentialAgent

# Load Environment Variables
load_dotenv()

# ==========================================
# 1. PROMPTS (Strict Separation of Concerns)
# ==========================================

LOG_PARSER_PROMPT = """
You are the Log Parser (Tier 3).
Task: Use the `read_server_logs` tool to analyze traffic for the requested target.
Output: Extract a pruned, 3-line contextual packet identifying any anomaly. Do not provide high-level strategy. Focus on identifying attacker IP addresses and attack types.
"""

VULNERABILITY_SCANNER_PROMPT = """
You are the Vulnerability Scanner (Tier 3).
Task: Use the `ping_endpoint` tool to check the system's security posture. 
Output: Identify exact vulnerabilities and system status. Keep it technical and brief.
"""

QUARANTINE_EXECUTOR_PROMPT = """
You are the Quarantine Executor (Tier 3).
Review the Log Parser's findings:
{parsed_logs}

Task: If a specific attacker IP address is identified in the logs, use the `execute_quarantine` tool to block it. If the block fails, clearly mention that.
If no malicious IP is identified, state that no action was taken.
Output: Report the blocking action taken and its status.
"""

INCIDENT_RESPONSE_COMMANDER_PROMPT = """
You are the Incident Response Commander (Tier 2).
You manage specific domains of security and triage alerts.
Review the outputs from your team:

---
LOG PARSER:
{parsed_logs}

QUARANTINE EXECUTOR:
{quarantine_status}
---

Task: Provide a tactical summary of the incident response. Ensure that any failed quarantine actions are highlighted so the Director knows there might still be an active threat.
Return a clear summary paragraph.
"""

THREAT_INTELLIGENCE_COORDINATOR_PROMPT = """
You are the Threat Intelligence Coordinator (Tier 2).
Review the outputs from your team:

---
VULNERABILITY SCANNER:
{scan_results}
---

Task: Provide an assessment of the exploitability and immediate risks of the vulnerabilities found.
Return a clear assessment paragraph.
"""

GLOBAL_THREAT_DIRECTOR_PROMPT = """
You are the Global Threat Director (Tier 1).
You are the master orchestrator responsible for overall network security posture.
You will receive reports from your Tier 2 managers:

Task: Combine these insights into an executive incident report for the CISO. 
Make a "business-critical" decision on whether any servers need to be taken offline based on the severity of the threat and vulnerabilities.

Return ONLY a valid JSON object matching this schema:
{{
    "executive_summary": "High level overview of the attack and mitigation",
    "threat_level": "Defcon 5 to Defcon 1 based on the risk",
    "business_critical_decision": "Decision on taking servers offline or maintaining uptime",
    "human_in_the_loop_required": true or false boolean (based on if Destructive Actions are needed)
}}

---
### INCIDENT RESPONSE REPORT
{incident_report}

---
### THREAT INTELLIGENCE REPORT
{threat_report}
"""

# ==========================================
# 2. TOOLS (Simulated SOC Environment)
# ==========================================

def read_server_logs(target_server: str) -> dict:
    """Reads raw traffic logs for anomalies."""
    anomalies = [
        "SQL Injection attempt detected from 203.0.113.42",
        "Multiple failed login attempts from 198.51.100.23",
        "Unusual outbound traffic spike to unknown external IP",
        "Suspicious payload detected in API request from 45.33.22.11"
    ]
    return {
        "status": "anomaly_detected",
        "target_server": target_server,
        "anomaly": random.choice(anomalies)
    }

def ping_endpoint(target_endpoint: str) -> dict:
    """Performs a read-only vulnerability scan."""
    vulns = ["CVE-2023-45281", "Outdated OpenSSL", "Weak SSH", "Log4Shell"]
    return {
        "status": "vulnerable",
        "endpoint": target_endpoint,
        "vulnerability_found": random.choice(vulns),
        "severity": random.choice(["Medium", "High", "Critical"])
    }

def execute_quarantine(ip_address: str) -> dict:
    """Blocks a specific IP address at the firewall."""
    if random.choice([True, True, False]):
        return {"status": "success", "ip_address": ip_address}
    return {"status": "error", "message": "API timeout"}

# ==========================================
# 3. AGENT DEFINITIONS (3-Tier Hierarchy)
# ==========================================

# Tier 3 (Doers)
log_parser = LlmAgent(
    name="log_parser", model="gemini-2.5-flash",
    instruction=LOG_PARSER_PROMPT, tools=[read_server_logs], output_key="parsed_logs"
)
vulnerability_scanner = LlmAgent(
    name="vulnerability_scanner", model="gemini-2.5-flash",
    instruction=VULNERABILITY_SCANNER_PROMPT, tools=[ping_endpoint], output_key="scan_results"
)
quarantine_executor = LlmAgent(
    name="quarantine_executor", model="gemini-2.5-flash",
    instruction=QUARANTINE_EXECUTOR_PROMPT, tools=[execute_quarantine], output_key="quarantine_status"
)

# Tier 2 (Managers)
incident_commander = LlmAgent(
    name="incident_response_commander", model="gemini-2.5-flash",
    instruction=INCIDENT_RESPONSE_COMMANDER_PROMPT, output_key="incident_report"
)
threat_coordinator = LlmAgent(
    name="threat_intelligence_coordinator", model="gemini-2.5-flash",
    instruction=THREAT_INTELLIGENCE_COORDINATOR_PROMPT, output_key="threat_report"
)

# Workflows
incident_team = SequentialAgent(
    name="incident_team", sub_agents=[log_parser, quarantine_executor, incident_commander]
)
threat_team = SequentialAgent(
    name="threat_team", sub_agents=[vulnerability_scanner, threat_coordinator]
)
mid_tier = ParallelAgent(
    name="mid_tier_orchestrator", sub_agents=[incident_team, threat_team]
)

# Tier 1 (Executive)
director = LlmAgent(
    name="global_threat_director", model="gemini-2.5-pro",
    instruction=GLOBAL_THREAT_DIRECTOR_PROMPT, output_key="final_ciso_report"
)

# Root Orchestrator
soc_agent = SequentialAgent(
    name="hierarchical_soc_agent", sub_agents=[mid_tier, director]
)

# ==========================================
# 4. ENTRY POINT
# ==========================================

if __name__ == "__main__":
    from google.adk.cli.adk_run import serve_agent_cli
    asyncio.run(serve_agent_cli(soc_agent))
🚀 Quick Start: Simply paste the code above into a file named run_soc.py, ensure your GOOGLE_API_KEY is set in your environment, and run python run_soc.py to start the CLI testing mode.
in AI
Beyond Standard RAG: Building a Parallel Legal AI with Google ADK