Hierarchical SOC AI Agent
Zero to Hero: Building a Multi-Tier Cyber Security Operations Center with Google ADK
Introduction
Welcome to the Hierarchical SOC Agent tutorial! Here, we break down how to migrate a multi-tiered Security Operations Center (SOC) simulation to Google's Agent Development Kit (ADK).
This implementation heavily leverages ADK's SequentialAgent and ParallelAgent orchestrators. It maps a traditional incident response team perfectly by ensuring the low-level tier "Doers" (like Log Parsers) pass synthesized context to the mid-tier "Managers", which execute in parallel workflows, who finally synthesize everything to the high-level "Executive" Director making critical business decisions.
Project Structure
We split our application up into elegant, manageable pieces. Here is what the folder structure looks like:
/hierarchical_aI_agents/
├── __init__.py # 🚪 Required package declaration
├── agent.py # 🧠 The Logic: 3-Tier Multi-Agent Orchestration
├── tools.py # 🛠️ The Hooks: Fetch logs & scan endpoints
├── prompts.py # 🎭 The Instructions: Strict separation of concerns
├── standalone_soc_agent.py # ⚙️ Testing CLI script
├── .env # 🔑 Keys pointing to Gemini
└── run_agent.sh # 🚀 Web UI launcher
Step 2: External Functions for Low-Level Agents
Our Tier 3 Doers require highly specific tooling restricted by the principle of least privilege.
# tools.py
import random
def read_server_logs(target_server: str) -> dict:
"""Read raw server logs for anomalies."""
# ... implementation details
return {"status": "anomaly_detected"}
def ping_endpoint(target_endpoint: str) -> dict:
"""Read-only vulnerability scan."""
# ... implementation details
return {"status": "vulnerable"}
def execute_quarantine(ip_address: str) -> dict:
"""Update firewall rule to block an IP."""
# ... implementation details
return {"status": "success"}
Step 3: Orchestrating the 3-Tier Hierarchy
We use chains of `SequentialAgent` constructs to build logical workflows, then execute those disparate workflows simultaneously with a `ParallelAgent`, and finally culminate in a master `SequentialAgent`.
# agent.py
from google.adk.agents import LlmAgent, ParallelAgent, SequentialAgent
# ... Agent instantiations omitted for brevity
# --- WORKFLOWS ---
# Incident workflow processes logs, acts, then synthesizes (Tier 3 -> Tier 2)
incident_team = SequentialAgent(
name="incident_team",
description="Handles log parsing and mitigation.",
sub_agents=[log_parser, quarantine_executor, incident_commander]
)
# Threat workflow processes scans, then synthesizes (Tier 3 -> Tier 2)
threat_team = SequentialAgent(
name="threat_team",
description="Handles vulnerability scanning and assessment.",
sub_agents=[vulnerability_scanner, threat_coordinator]
)
# Run mid-tier workflows in parallel
mid_tier = ParallelAgent(
name="mid_tier_orchestrator",
description="Runs the incident and threat workflows concurrently.",
sub_agents=[incident_team, threat_team]
)
# --- MASTER ORCHESTRATOR ---
soc_agent = SequentialAgent(
name="hierarchical_soc_agent",
description="Automated Security Operations Center using a 3-tier hierarchical structure.",
sub_agents=[mid_tier, director]
)
All-in-One Standalone Code
While the project is naturally split into modules for better maintainability, you can run the entire Hierarchical SOC Agent from a single file. Below is the combined code including Prompts, Tools, and the 3-Tier Agent Logic.
import os
import random
import asyncio
from dotenv import load_dotenv
from google.adk.agents import LlmAgent, ParallelAgent, SequentialAgent
# Load Environment Variables
load_dotenv()
# ==========================================
# 1. PROMPTS (Strict Separation of Concerns)
# ==========================================
LOG_PARSER_PROMPT = """
You are the Log Parser (Tier 3).
Task: Use the `read_server_logs` tool to analyze traffic for the requested target.
Output: Extract a pruned, 3-line contextual packet identifying any anomaly. Do not provide high-level strategy. Focus on identifying attacker IP addresses and attack types.
"""
VULNERABILITY_SCANNER_PROMPT = """
You are the Vulnerability Scanner (Tier 3).
Task: Use the `ping_endpoint` tool to check the system's security posture.
Output: Identify exact vulnerabilities and system status. Keep it technical and brief.
"""
QUARANTINE_EXECUTOR_PROMPT = """
You are the Quarantine Executor (Tier 3).
Review the Log Parser's findings:
{parsed_logs}
Task: If a specific attacker IP address is identified in the logs, use the `execute_quarantine` tool to block it. If the block fails, clearly mention that.
If no malicious IP is identified, state that no action was taken.
Output: Report the blocking action taken and its status.
"""
INCIDENT_RESPONSE_COMMANDER_PROMPT = """
You are the Incident Response Commander (Tier 2).
You manage specific domains of security and triage alerts.
Review the outputs from your team:
---
LOG PARSER:
{parsed_logs}
QUARANTINE EXECUTOR:
{quarantine_status}
---
Task: Provide a tactical summary of the incident response. Ensure that any failed quarantine actions are highlighted so the Director knows there might still be an active threat.
Return a clear summary paragraph.
"""
THREAT_INTELLIGENCE_COORDINATOR_PROMPT = """
You are the Threat Intelligence Coordinator (Tier 2).
Review the outputs from your team:
---
VULNERABILITY SCANNER:
{scan_results}
---
Task: Provide an assessment of the exploitability and immediate risks of the vulnerabilities found.
Return a clear assessment paragraph.
"""
GLOBAL_THREAT_DIRECTOR_PROMPT = """
You are the Global Threat Director (Tier 1).
You are the master orchestrator responsible for overall network security posture.
You will receive reports from your Tier 2 managers:
Task: Combine these insights into an executive incident report for the CISO.
Make a "business-critical" decision on whether any servers need to be taken offline based on the severity of the threat and vulnerabilities.
Return ONLY a valid JSON object matching this schema:
{{
"executive_summary": "High level overview of the attack and mitigation",
"threat_level": "Defcon 5 to Defcon 1 based on the risk",
"business_critical_decision": "Decision on taking servers offline or maintaining uptime",
"human_in_the_loop_required": true or false boolean (based on if Destructive Actions are needed)
}}
---
### INCIDENT RESPONSE REPORT
{incident_report}
---
### THREAT INTELLIGENCE REPORT
{threat_report}
"""
# ==========================================
# 2. TOOLS (Simulated SOC Environment)
# ==========================================
def read_server_logs(target_server: str) -> dict:
"""Reads raw traffic logs for anomalies."""
anomalies = [
"SQL Injection attempt detected from 203.0.113.42",
"Multiple failed login attempts from 198.51.100.23",
"Unusual outbound traffic spike to unknown external IP",
"Suspicious payload detected in API request from 45.33.22.11"
]
return {
"status": "anomaly_detected",
"target_server": target_server,
"anomaly": random.choice(anomalies)
}
def ping_endpoint(target_endpoint: str) -> dict:
"""Performs a read-only vulnerability scan."""
vulns = ["CVE-2023-45281", "Outdated OpenSSL", "Weak SSH", "Log4Shell"]
return {
"status": "vulnerable",
"endpoint": target_endpoint,
"vulnerability_found": random.choice(vulns),
"severity": random.choice(["Medium", "High", "Critical"])
}
def execute_quarantine(ip_address: str) -> dict:
"""Blocks a specific IP address at the firewall."""
if random.choice([True, True, False]):
return {"status": "success", "ip_address": ip_address}
return {"status": "error", "message": "API timeout"}
# ==========================================
# 3. AGENT DEFINITIONS (3-Tier Hierarchy)
# ==========================================
# Tier 3 (Doers)
log_parser = LlmAgent(
name="log_parser", model="gemini-2.5-flash",
instruction=LOG_PARSER_PROMPT, tools=[read_server_logs], output_key="parsed_logs"
)
vulnerability_scanner = LlmAgent(
name="vulnerability_scanner", model="gemini-2.5-flash",
instruction=VULNERABILITY_SCANNER_PROMPT, tools=[ping_endpoint], output_key="scan_results"
)
quarantine_executor = LlmAgent(
name="quarantine_executor", model="gemini-2.5-flash",
instruction=QUARANTINE_EXECUTOR_PROMPT, tools=[execute_quarantine], output_key="quarantine_status"
)
# Tier 2 (Managers)
incident_commander = LlmAgent(
name="incident_response_commander", model="gemini-2.5-flash",
instruction=INCIDENT_RESPONSE_COMMANDER_PROMPT, output_key="incident_report"
)
threat_coordinator = LlmAgent(
name="threat_intelligence_coordinator", model="gemini-2.5-flash",
instruction=THREAT_INTELLIGENCE_COORDINATOR_PROMPT, output_key="threat_report"
)
# Workflows
incident_team = SequentialAgent(
name="incident_team", sub_agents=[log_parser, quarantine_executor, incident_commander]
)
threat_team = SequentialAgent(
name="threat_team", sub_agents=[vulnerability_scanner, threat_coordinator]
)
mid_tier = ParallelAgent(
name="mid_tier_orchestrator", sub_agents=[incident_team, threat_team]
)
# Tier 1 (Executive)
director = LlmAgent(
name="global_threat_director", model="gemini-2.5-pro",
instruction=GLOBAL_THREAT_DIRECTOR_PROMPT, output_key="final_ciso_report"
)
# Root Orchestrator
soc_agent = SequentialAgent(
name="hierarchical_soc_agent", sub_agents=[mid_tier, director]
)
# ==========================================
# 4. ENTRY POINT
# ==========================================
if __name__ == "__main__":
from google.adk.cli.adk_run import serve_agent_cli
asyncio.run(serve_agent_cli(soc_agent))
run_soc.py, ensure your GOOGLE_API_KEY is set in your environment, and run python run_soc.py to start the CLI testing mode.