modified: README.md new file: app/dashboard.py new file: app/main.py new file: docker-compose.yml new file: snort/local.rules new file: snort/snort-logs/soc_actions.log new file: snort/snort.lua new file: snort/snort3-community.rules
233 lines
9.0 KiB
Python
233 lines
9.0 KiB
Python
import time
|
|
import os
|
|
import json
|
|
import requests
|
|
import re
|
|
|
|
LOG_FILE = "/var/log/snort/alert_json.txt"
|
|
RULES_FILE = "/app/local.rules"
|
|
ACTION_LOG = "/var/log/snort/soc_actions.log"
|
|
WEBHOOK = os.environ.get("WEBHOOK_URL")
|
|
OPENROUTER_KEY = os.environ.get("OPENROUTER_API_KEY")
|
|
|
|
LLM_MODEL = "anthropic/claude-3.5-haiku"
|
|
INTERNAL_PREFIXES = ("192.168.", "10.", "172.")
|
|
|
|
def log_msg(msg):
|
|
"""Writes to standard output and the shared action log for the dashboard."""
|
|
print(msg)
|
|
try:
|
|
with open(ACTION_LOG, "a") as f:
|
|
f.write(msg + "\n")
|
|
f.flush()
|
|
os.fsync(f.fileno())
|
|
except Exception:
|
|
pass
|
|
|
|
def append_snort_rule(rule_string):
|
|
rule_string = rule_string.strip()
|
|
|
|
valid_starts = ["drop tcp", "drop udp", "drop icmp", "drop ip"]
|
|
if not any(rule_string.startswith(prefix) for prefix in valid_starts):
|
|
log_msg(f"CRITICAL: Blocked invalid protocol syntax: {rule_string}")
|
|
return "Rejected: Rule must start with drop tcp, udp, icmp, or ip."
|
|
|
|
if "any any -> any any" in rule_string:
|
|
log_msg("CRITICAL: Blocked LLM attempt to deploy a nuclear 'any any' drop rule.")
|
|
return "Rejected: Rule would cause total network blackout."
|
|
|
|
dangerous_targets = ["255.255.255.255", "224.0.0", "239.255.255", "ff02::"]
|
|
if any(target in rule_string for target in dangerous_targets):
|
|
log_msg(f"CRITICAL: Blocked LLM attempt to block broadcast/multicast traffic.")
|
|
return "Rejected: Rule targets critical local infrastructure noise."
|
|
|
|
if "->" not in rule_string or "sid:" not in rule_string:
|
|
log_msg("CRITICAL: Blocked malformed rule structural syntax.")
|
|
return "Rejected: Malformed Snort syntax."
|
|
|
|
parts = rule_string.split()
|
|
if len(parts) > 2:
|
|
src_ip = parts[2]
|
|
|
|
if src_ip.lower() == "any":
|
|
log_msg("CRITICAL: Blocked rule. LLM failed to identify a specific attacker IP.")
|
|
return "Rejected: Source IP cannot be 'any'."
|
|
|
|
if src_ip.startswith(INTERNAL_PREFIXES):
|
|
log_msg(f"CRITICAL: Blocked LLM attempt to ban internal subnet IP: {src_ip}")
|
|
return "Rejected: Cannot block internal network IPs."
|
|
|
|
if os.path.exists(RULES_FILE):
|
|
with open(RULES_FILE, "r") as f:
|
|
if src_ip in f.read():
|
|
log_msg(f"Skipping: Attacker {src_ip} is already blocked in local.rules")
|
|
return "Rejected: IP already blocked."
|
|
|
|
try:
|
|
with open(RULES_FILE, "a") as f:
|
|
f.write(f"\n# Auto-generated by LLM\n{rule_string}\n")
|
|
return "Rule successfully appended to local.rules."
|
|
except Exception as e:
|
|
return f"Failed to write rule: {e}"
|
|
|
|
def get_next_sid():
|
|
highest_sid = 1000000
|
|
if os.path.exists(RULES_FILE):
|
|
with open(RULES_FILE, "r") as f:
|
|
sids = re.findall(r'sid:(\d+);', f.read())
|
|
if sids:
|
|
highest_sid = max([int(s) for s in sids])
|
|
return highest_sid + 1
|
|
|
|
def ask_llm_for_rule(alert_data):
|
|
if not OPENROUTER_KEY:
|
|
return
|
|
|
|
next_sid = get_next_sid()
|
|
headers = {
|
|
"Authorization": f"Bearer {OPENROUTER_KEY}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
prompt = (
|
|
"You are an automated SOC analyst generating Snort 3 block rules.\n"
|
|
f"Analyze this alert payload:\n{json.dumps(alert_data)}\n\n"
|
|
"CRITICAL REQUIREMENTS:\n"
|
|
f"1. Use EXACTLY this syntax: drop [proto] [src] any -> [dst] [dst_port] (msg:\"LLM Block\"; sid:{next_sid}; rev:1;)\n"
|
|
"2. The SOURCE port MUST ALWAYS be 'any'. Attackers use random ephemeral ports.\n"
|
|
"3. If the alert does not specify a clear, non-local external IP address as the attacker, you MUST NOT generate a rule.\n"
|
|
"4. NEVER target 255.255.255.255, multicast ranges, or loopback addresses.\n"
|
|
"5. CONTEXT: The protected internal network is 192.168.1.0/24. The attacker is ALWAYS external. NEVER use an IP starting with 192.168. as the source.\n"
|
|
"6. The source IP must be the specific external attacker. Never use 'any' for the source IP.\n"
|
|
"7. THOUGHT PROCESS: Briefly explain your reasoning in 1-2 sentences in the text response before deciding whether to call the tool or do nothing."
|
|
)
|
|
|
|
payload = {
|
|
"model": LLM_MODEL,
|
|
"messages": [{"role": "user", "content": prompt}],
|
|
"tools": [{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "append_snort_rule",
|
|
"description": "Appends a new Snort rule.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"rule_string": {
|
|
"type": "string",
|
|
"description": "The exact valid Snort 3 rule string."
|
|
}
|
|
},
|
|
"required": ["rule_string"]
|
|
}
|
|
}
|
|
}]
|
|
}
|
|
|
|
log_msg("Asking LLM for a block rule...")
|
|
try:
|
|
response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=payload)
|
|
response.raise_for_status()
|
|
response_data = response.json()
|
|
|
|
message = response_data["choices"][0]["message"]
|
|
content = message.get("content", "")
|
|
|
|
if content:
|
|
log_msg(f"LLM Reasoning: {content.strip()}")
|
|
|
|
if "tool_calls" in message and message["tool_calls"]:
|
|
for tool_call in message["tool_calls"]:
|
|
if tool_call["function"]["name"] == "append_snort_rule":
|
|
args = json.loads(tool_call["function"]["arguments"])
|
|
rule_string = args.get("rule_string")
|
|
log_msg(f"LLM generated rule: {rule_string}")
|
|
|
|
result = append_snort_rule(rule_string)
|
|
log_msg(result)
|
|
else:
|
|
log_msg("LLM opted not to generate a rule.")
|
|
|
|
except Exception as e:
|
|
log_msg(f"Failed to communicate with LLM or parse response: {e}")
|
|
|
|
def send_discord_alert(data, raw_json_str, proto, src, dst, rule_id):
|
|
embed = {
|
|
"title": f"[SNORT] {proto} Traffic Detected",
|
|
"description": f"**Raw Payload:**\n```json\n{raw_json_str}\n```",
|
|
"color": 16711680,
|
|
"fields": [
|
|
{"name": "Source", "value": f"`{src}`", "inline": True},
|
|
{"name": "Destination", "value": f"`{dst}`", "inline": True},
|
|
{"name": "Rule ID", "value": f"`{rule_id}`", "inline": True}
|
|
],
|
|
"footer": {"text": "Duct-Tape SOC"}
|
|
}
|
|
|
|
while True:
|
|
try:
|
|
response = requests.post(WEBHOOK, json={"embeds": [embed]})
|
|
|
|
if response.status_code == 429:
|
|
wait_time = response.json().get("retry_after", 1)
|
|
log_msg(f"Rate limited by Discord. Sleeping for {wait_time}s...")
|
|
time.sleep(wait_time)
|
|
continue
|
|
elif response.status_code in [200, 204]:
|
|
log_msg("Sent alert to Discord successfully.")
|
|
break
|
|
else:
|
|
log_msg(f"Discord responded: {response.status_code} - {response.text}")
|
|
break
|
|
except Exception as e:
|
|
log_msg(f"Failed to send to Discord: {e}")
|
|
break
|
|
|
|
def tail_and_ship():
|
|
log_msg(f"Waiting for Snort to create {LOG_FILE}...")
|
|
while not os.path.exists(LOG_FILE):
|
|
time.sleep(1)
|
|
|
|
log_msg("Log found. Tailing for alerts...")
|
|
with open(LOG_FILE, "r") as f:
|
|
f.seek(0, 2)
|
|
while True:
|
|
line = f.readline()
|
|
if not line:
|
|
time.sleep(0.5)
|
|
continue
|
|
|
|
try:
|
|
data = json.loads(line)
|
|
proto = data.get("proto", "UNKNOWN")
|
|
src = data.get("src_ap", "Unknown")
|
|
dst = data.get("dst_ap", "Unknown")
|
|
|
|
src_ip = src.split(':')[0] if ':' in src else src
|
|
dst_ip = dst.split(':')[0] if ':' in dst else dst
|
|
|
|
if dst_ip == "255.255.255.255" or dst_ip.startswith("224.") or dst_ip.startswith("239.") or dst_ip.startswith("ff02:"):
|
|
continue
|
|
|
|
if src_ip.startswith(INTERNAL_PREFIXES) and dst_ip.startswith(INTERNAL_PREFIXES):
|
|
continue
|
|
|
|
rule_id = data.get("rule", "Unknown")
|
|
raw_json_str = json.dumps(data, indent=2)
|
|
|
|
send_discord_alert(data, raw_json_str, proto, src, dst, rule_id)
|
|
ask_llm_for_rule(data)
|
|
|
|
except Exception as e:
|
|
log_msg(f"Failed to process alert: {e}")
|
|
|
|
if __name__ == "__main__":
|
|
if not WEBHOOK:
|
|
log_msg("Error: WEBHOOK_URL environment variable is missing.")
|
|
exit(1)
|
|
|
|
# Initialize the log file
|
|
with open(ACTION_LOG, "w") as f:
|
|
f.write("SOC Action Log Initialized.\n")
|
|
|
|
tail_and_ship() |