import json import os import random from fastapi import FastAPI, Form, HTTPException from fastapi.responses import HTMLResponse, JSONResponse app = FastAPI(title="Duct-Tape SOC Dashboard") LOG_FILE = "/var/log/snort/alert_json.txt" RULES_FILE = "/etc/snort/rules/local.rules" ACTION_LOG = "/var/log/snort/soc_actions.log" HTML_TEMPLATE = """ Duct-Tape SOC Dashboard v3

Duct-Tape SOC

Advanced Snort 3 & LLM Firewall Automation Simulator

Active Rules: 0
Alerts Logged: 0

Quick Scenarios


Chaos Mode

Python & LLM Action Stream

Awaiting actions...

Active local.rules

Loading rules...

Live Snort JSON Tail

Streaming logs...

""" def write_alert(payload): try: with open(LOG_FILE, "a") as f: f.write(json.dumps(payload) + "\n") f.flush() os.fsync(f.fileno()) except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to write to log: {e}") @app.get("/", response_class=HTMLResponse) def index(): return HTML_TEMPLATE @app.get("/api/data") def get_data(): rules_content = "File empty or missing." rule_count = 0 if os.path.exists(RULES_FILE): with open(RULES_FILE, "r") as f: rules_content = f.read().strip() rule_count = rules_content.count("drop ") actions_content = "" if os.path.exists(ACTION_LOG): with open(ACTION_LOG, "r") as f: # Grab the last 25 lines of the action stream actions_content = "".join(f.readlines()[-25:]) parsed_alerts = [] total_alerts = 0 if os.path.exists(LOG_FILE): with open(LOG_FILE, "r") as f: lines = f.readlines() total_alerts = len(lines) for line in reversed(lines[-10:]): try: parsed_alerts.append(json.loads(line)) except: continue return JSONResponse(content={ "rules": rules_content, "alerts": parsed_alerts, "actions": actions_content.strip(), "rule_count": rule_count, "total_alerts": total_alerts }) @app.post("/inject-standard") def inject_standard(scenario: str = Form(...)): scenarios = { "ssh_attack": {"proto": "TCP", "src_ap": "185.220.101.5:43210", "dst_ap": "192.168.1.50:22", "rule": "1:2000123:1", "msg": "SSH Brute Force Attempt"}, "nmap_scan": {"proto": "TCP", "src_ap": "45.33.32.156:59832", "dst_ap": "192.168.1.50:80", "rule": "1:2000456:1", "msg": "Nmap Port Scan"}, "ssdp_noise": {"proto": "UDP", "src_ap": "192.168.1.121:1900", "dst_ap": "239.255.255.250:1900", "rule": "116:6:1", "msg": "SSDP Broadcast"}, "mdns_noise": {"proto": "UDP", "src_ap": "192.168.1.83:5353", "dst_ap": "224.0.0.251:5353", "rule": "116:6:1", "msg": "mDNS Multicast"} } if scenario not in scenarios: raise HTTPException(status_code=400, detail="Invalid scenario") write_alert(scenarios[scenario]) return {"status": "success"} @app.post("/inject-random") def inject_random(): src_ip = f"{random.randint(1, 223)}.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(1, 254)}" src_port = random.randint(1024, 65535) dst_ip = f"192.168.1.{random.randint(2, 254)}" dst_port = random.choice([22, 80, 443, 3389, 8080]) payload = { "proto": random.choice(["TCP", "UDP"]), "src_ap": f"{src_ip}:{src_port}", "dst_ap": f"{dst_ip}:{dst_port}", "rule": f"1:{random.randint(10000, 99999)}:1", "msg": "Simulated Random Attack" } write_alert(payload) return {"status": "success"} @app.post("/clear-rules") def clear_rules(): try: with open(RULES_FILE, "w") as f: f.write("") return {"status": "success"} except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to clear rules: {e}")