import time import os import json import requests import re LOG_FILE = "/var/log/snort/alert_json.txt" RULES_FILE = "/app/local.rules" ACTION_LOG = "/var/log/snort/soc_actions.log" WEBHOOK = os.environ.get("WEBHOOK_URL") OPENROUTER_KEY = os.environ.get("OPENROUTER_API_KEY") LLM_MODEL = "anthropic/claude-3.5-haiku" INTERNAL_PREFIXES = ("192.168.", "10.", "172.") def log_msg(msg): """Writes to standard output and the shared action log for the dashboard.""" print(msg) try: with open(ACTION_LOG, "a") as f: f.write(msg + "\n") f.flush() os.fsync(f.fileno()) except Exception: pass def append_snort_rule(rule_string): rule_string = rule_string.strip() valid_starts = ["drop tcp", "drop udp", "drop icmp", "drop ip"] if not any(rule_string.startswith(prefix) for prefix in valid_starts): log_msg(f"CRITICAL: Blocked invalid protocol syntax: {rule_string}") return "Rejected: Rule must start with drop tcp, udp, icmp, or ip." if "any any -> any any" in rule_string: log_msg("CRITICAL: Blocked LLM attempt to deploy a nuclear 'any any' drop rule.") return "Rejected: Rule would cause total network blackout." dangerous_targets = ["255.255.255.255", "224.0.0", "239.255.255", "ff02::"] if any(target in rule_string for target in dangerous_targets): log_msg(f"CRITICAL: Blocked LLM attempt to block broadcast/multicast traffic.") return "Rejected: Rule targets critical local infrastructure noise." if "->" not in rule_string or "sid:" not in rule_string: log_msg("CRITICAL: Blocked malformed rule structural syntax.") return "Rejected: Malformed Snort syntax." parts = rule_string.split() if len(parts) > 2: src_ip = parts[2] if src_ip.lower() == "any": log_msg("CRITICAL: Blocked rule. LLM failed to identify a specific attacker IP.") return "Rejected: Source IP cannot be 'any'." if src_ip.startswith(INTERNAL_PREFIXES): log_msg(f"CRITICAL: Blocked LLM attempt to ban internal subnet IP: {src_ip}") return "Rejected: Cannot block internal network IPs." if os.path.exists(RULES_FILE): with open(RULES_FILE, "r") as f: if src_ip in f.read(): log_msg(f"Skipping: Attacker {src_ip} is already blocked in local.rules") return "Rejected: IP already blocked." try: with open(RULES_FILE, "a") as f: f.write(f"\n# Auto-generated by LLM\n{rule_string}\n") return "Rule successfully appended to local.rules." except Exception as e: return f"Failed to write rule: {e}" def get_next_sid(): highest_sid = 1000000 if os.path.exists(RULES_FILE): with open(RULES_FILE, "r") as f: sids = re.findall(r'sid:(\d+);', f.read()) if sids: highest_sid = max([int(s) for s in sids]) return highest_sid + 1 def ask_llm_for_rule(alert_data): if not OPENROUTER_KEY: return next_sid = get_next_sid() headers = { "Authorization": f"Bearer {OPENROUTER_KEY}", "Content-Type": "application/json" } prompt = ( "You are an automated SOC analyst generating Snort 3 block rules.\n" f"Analyze this alert payload:\n{json.dumps(alert_data)}\n\n" "CRITICAL REQUIREMENTS:\n" f"1. Use EXACTLY this syntax: drop [proto] [src] any -> [dst] [dst_port] (msg:\"LLM Block\"; sid:{next_sid}; rev:1;)\n" "2. The SOURCE port MUST ALWAYS be 'any'. Attackers use random ephemeral ports.\n" "3. If the alert does not specify a clear, non-local external IP address as the attacker, you MUST NOT generate a rule.\n" "4. NEVER target 255.255.255.255, multicast ranges, or loopback addresses.\n" "5. CONTEXT: The protected internal network is 192.168.1.0/24. The attacker is ALWAYS external. NEVER use an IP starting with 192.168. as the source.\n" "6. The source IP must be the specific external attacker. Never use 'any' for the source IP.\n" "7. THOUGHT PROCESS: Briefly explain your reasoning in 1-2 sentences in the text response before deciding whether to call the tool or do nothing." ) payload = { "model": LLM_MODEL, "messages": [{"role": "user", "content": prompt}], "tools": [{ "type": "function", "function": { "name": "append_snort_rule", "description": "Appends a new Snort rule.", "parameters": { "type": "object", "properties": { "rule_string": { "type": "string", "description": "The exact valid Snort 3 rule string." } }, "required": ["rule_string"] } } }] } log_msg("Asking LLM for a block rule...") try: response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=payload) response.raise_for_status() response_data = response.json() message = response_data["choices"][0]["message"] content = message.get("content", "") if content: log_msg(f"LLM Reasoning: {content.strip()}") if "tool_calls" in message and message["tool_calls"]: for tool_call in message["tool_calls"]: if tool_call["function"]["name"] == "append_snort_rule": args = json.loads(tool_call["function"]["arguments"]) rule_string = args.get("rule_string") log_msg(f"LLM generated rule: {rule_string}") result = append_snort_rule(rule_string) log_msg(result) else: log_msg("LLM opted not to generate a rule.") except Exception as e: log_msg(f"Failed to communicate with LLM or parse response: {e}") def send_discord_alert(data, raw_json_str, proto, src, dst, rule_id): embed = { "title": f"[SNORT] {proto} Traffic Detected", "description": f"**Raw Payload:**\n```json\n{raw_json_str}\n```", "color": 16711680, "fields": [ {"name": "Source", "value": f"`{src}`", "inline": True}, {"name": "Destination", "value": f"`{dst}`", "inline": True}, {"name": "Rule ID", "value": f"`{rule_id}`", "inline": True} ], "footer": {"text": "Duct-Tape SOC"} } while True: try: response = requests.post(WEBHOOK, json={"embeds": [embed]}) if response.status_code == 429: wait_time = response.json().get("retry_after", 1) log_msg(f"Rate limited by Discord. Sleeping for {wait_time}s...") time.sleep(wait_time) continue elif response.status_code in [200, 204]: log_msg("Sent alert to Discord successfully.") break else: log_msg(f"Discord responded: {response.status_code} - {response.text}") break except Exception as e: log_msg(f"Failed to send to Discord: {e}") break def tail_and_ship(): log_msg(f"Waiting for Snort to create {LOG_FILE}...") while not os.path.exists(LOG_FILE): time.sleep(1) log_msg("Log found. Tailing for alerts...") with open(LOG_FILE, "r") as f: f.seek(0, 2) while True: line = f.readline() if not line: time.sleep(0.5) continue try: data = json.loads(line) proto = data.get("proto", "UNKNOWN") src = data.get("src_ap", "Unknown") dst = data.get("dst_ap", "Unknown") src_ip = src.split(':')[0] if ':' in src else src dst_ip = dst.split(':')[0] if ':' in dst else dst if dst_ip == "255.255.255.255" or dst_ip.startswith("224.") or dst_ip.startswith("239.") or dst_ip.startswith("ff02:"): continue if src_ip.startswith(INTERNAL_PREFIXES) and dst_ip.startswith(INTERNAL_PREFIXES): continue rule_id = data.get("rule", "Unknown") raw_json_str = json.dumps(data, indent=2) send_discord_alert(data, raw_json_str, proto, src, dst, rule_id) ask_llm_for_rule(data) except Exception as e: log_msg(f"Failed to process alert: {e}") if __name__ == "__main__": if not WEBHOOK: log_msg("Error: WEBHOOK_URL environment variable is missing.") exit(1) # Initialize the log file with open(ACTION_LOG, "w") as f: f.write("SOC Action Log Initialized.\n") tail_and_ship()