new file: .gitignore

modified:   README.md
	new file:   app/dashboard.py
	new file:   app/main.py
	new file:   docker-compose.yml
	new file:   snort/local.rules
	new file:   snort/snort-logs/soc_actions.log
	new file:   snort/snort.lua
	new file:   snort/snort3-community.rules
This commit is contained in:
2026-05-29 21:22:34 -04:00
parent 03544b828d
commit 19c23117b9
9 changed files with 4923 additions and 2 deletions

233
app/main.py Normal file
View File

@@ -0,0 +1,233 @@
import time
import os
import json
import requests
import re
LOG_FILE = "/var/log/snort/alert_json.txt"
RULES_FILE = "/app/local.rules"
ACTION_LOG = "/var/log/snort/soc_actions.log"
WEBHOOK = os.environ.get("WEBHOOK_URL")
OPENROUTER_KEY = os.environ.get("OPENROUTER_API_KEY")
LLM_MODEL = "anthropic/claude-3.5-haiku"
INTERNAL_PREFIXES = ("192.168.", "10.", "172.")
def log_msg(msg):
"""Writes to standard output and the shared action log for the dashboard."""
print(msg)
try:
with open(ACTION_LOG, "a") as f:
f.write(msg + "\n")
f.flush()
os.fsync(f.fileno())
except Exception:
pass
def append_snort_rule(rule_string):
rule_string = rule_string.strip()
valid_starts = ["drop tcp", "drop udp", "drop icmp", "drop ip"]
if not any(rule_string.startswith(prefix) for prefix in valid_starts):
log_msg(f"CRITICAL: Blocked invalid protocol syntax: {rule_string}")
return "Rejected: Rule must start with drop tcp, udp, icmp, or ip."
if "any any -> any any" in rule_string:
log_msg("CRITICAL: Blocked LLM attempt to deploy a nuclear 'any any' drop rule.")
return "Rejected: Rule would cause total network blackout."
dangerous_targets = ["255.255.255.255", "224.0.0", "239.255.255", "ff02::"]
if any(target in rule_string for target in dangerous_targets):
log_msg(f"CRITICAL: Blocked LLM attempt to block broadcast/multicast traffic.")
return "Rejected: Rule targets critical local infrastructure noise."
if "->" not in rule_string or "sid:" not in rule_string:
log_msg("CRITICAL: Blocked malformed rule structural syntax.")
return "Rejected: Malformed Snort syntax."
parts = rule_string.split()
if len(parts) > 2:
src_ip = parts[2]
if src_ip.lower() == "any":
log_msg("CRITICAL: Blocked rule. LLM failed to identify a specific attacker IP.")
return "Rejected: Source IP cannot be 'any'."
if src_ip.startswith(INTERNAL_PREFIXES):
log_msg(f"CRITICAL: Blocked LLM attempt to ban internal subnet IP: {src_ip}")
return "Rejected: Cannot block internal network IPs."
if os.path.exists(RULES_FILE):
with open(RULES_FILE, "r") as f:
if src_ip in f.read():
log_msg(f"Skipping: Attacker {src_ip} is already blocked in local.rules")
return "Rejected: IP already blocked."
try:
with open(RULES_FILE, "a") as f:
f.write(f"\n# Auto-generated by LLM\n{rule_string}\n")
return "Rule successfully appended to local.rules."
except Exception as e:
return f"Failed to write rule: {e}"
def get_next_sid():
highest_sid = 1000000
if os.path.exists(RULES_FILE):
with open(RULES_FILE, "r") as f:
sids = re.findall(r'sid:(\d+);', f.read())
if sids:
highest_sid = max([int(s) for s in sids])
return highest_sid + 1
def ask_llm_for_rule(alert_data):
if not OPENROUTER_KEY:
return
next_sid = get_next_sid()
headers = {
"Authorization": f"Bearer {OPENROUTER_KEY}",
"Content-Type": "application/json"
}
prompt = (
"You are an automated SOC analyst generating Snort 3 block rules.\n"
f"Analyze this alert payload:\n{json.dumps(alert_data)}\n\n"
"CRITICAL REQUIREMENTS:\n"
f"1. Use EXACTLY this syntax: drop [proto] [src] any -> [dst] [dst_port] (msg:\"LLM Block\"; sid:{next_sid}; rev:1;)\n"
"2. The SOURCE port MUST ALWAYS be 'any'. Attackers use random ephemeral ports.\n"
"3. If the alert does not specify a clear, non-local external IP address as the attacker, you MUST NOT generate a rule.\n"
"4. NEVER target 255.255.255.255, multicast ranges, or loopback addresses.\n"
"5. CONTEXT: The protected internal network is 192.168.1.0/24. The attacker is ALWAYS external. NEVER use an IP starting with 192.168. as the source.\n"
"6. The source IP must be the specific external attacker. Never use 'any' for the source IP.\n"
"7. THOUGHT PROCESS: Briefly explain your reasoning in 1-2 sentences in the text response before deciding whether to call the tool or do nothing."
)
payload = {
"model": LLM_MODEL,
"messages": [{"role": "user", "content": prompt}],
"tools": [{
"type": "function",
"function": {
"name": "append_snort_rule",
"description": "Appends a new Snort rule.",
"parameters": {
"type": "object",
"properties": {
"rule_string": {
"type": "string",
"description": "The exact valid Snort 3 rule string."
}
},
"required": ["rule_string"]
}
}
}]
}
log_msg("Asking LLM for a block rule...")
try:
response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=payload)
response.raise_for_status()
response_data = response.json()
message = response_data["choices"][0]["message"]
content = message.get("content", "")
if content:
log_msg(f"LLM Reasoning: {content.strip()}")
if "tool_calls" in message and message["tool_calls"]:
for tool_call in message["tool_calls"]:
if tool_call["function"]["name"] == "append_snort_rule":
args = json.loads(tool_call["function"]["arguments"])
rule_string = args.get("rule_string")
log_msg(f"LLM generated rule: {rule_string}")
result = append_snort_rule(rule_string)
log_msg(result)
else:
log_msg("LLM opted not to generate a rule.")
except Exception as e:
log_msg(f"Failed to communicate with LLM or parse response: {e}")
def send_discord_alert(data, raw_json_str, proto, src, dst, rule_id):
embed = {
"title": f"[SNORT] {proto} Traffic Detected",
"description": f"**Raw Payload:**\n```json\n{raw_json_str}\n```",
"color": 16711680,
"fields": [
{"name": "Source", "value": f"`{src}`", "inline": True},
{"name": "Destination", "value": f"`{dst}`", "inline": True},
{"name": "Rule ID", "value": f"`{rule_id}`", "inline": True}
],
"footer": {"text": "Duct-Tape SOC"}
}
while True:
try:
response = requests.post(WEBHOOK, json={"embeds": [embed]})
if response.status_code == 429:
wait_time = response.json().get("retry_after", 1)
log_msg(f"Rate limited by Discord. Sleeping for {wait_time}s...")
time.sleep(wait_time)
continue
elif response.status_code in [200, 204]:
log_msg("Sent alert to Discord successfully.")
break
else:
log_msg(f"Discord responded: {response.status_code} - {response.text}")
break
except Exception as e:
log_msg(f"Failed to send to Discord: {e}")
break
def tail_and_ship():
log_msg(f"Waiting for Snort to create {LOG_FILE}...")
while not os.path.exists(LOG_FILE):
time.sleep(1)
log_msg("Log found. Tailing for alerts...")
with open(LOG_FILE, "r") as f:
f.seek(0, 2)
while True:
line = f.readline()
if not line:
time.sleep(0.5)
continue
try:
data = json.loads(line)
proto = data.get("proto", "UNKNOWN")
src = data.get("src_ap", "Unknown")
dst = data.get("dst_ap", "Unknown")
src_ip = src.split(':')[0] if ':' in src else src
dst_ip = dst.split(':')[0] if ':' in dst else dst
if dst_ip == "255.255.255.255" or dst_ip.startswith("224.") or dst_ip.startswith("239.") or dst_ip.startswith("ff02:"):
continue
if src_ip.startswith(INTERNAL_PREFIXES) and dst_ip.startswith(INTERNAL_PREFIXES):
continue
rule_id = data.get("rule", "Unknown")
raw_json_str = json.dumps(data, indent=2)
send_discord_alert(data, raw_json_str, proto, src, dst, rule_id)
ask_llm_for_rule(data)
except Exception as e:
log_msg(f"Failed to process alert: {e}")
if __name__ == "__main__":
if not WEBHOOK:
log_msg("Error: WEBHOOK_URL environment variable is missing.")
exit(1)
# Initialize the log file
with open(ACTION_LOG, "w") as f:
f.write("SOC Action Log Initialized.\n")
tail_and_ship()