import os import sqlite3 import requests from flask import Flask, render_template, request, jsonify, redirect, url_for, flash, send_from_directory from flask_socketio import SocketIO, emit from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user from werkzeug.security import generate_password_hash, check_password_hash app = Flask(__name__) app.config['SECRET_KEY'] = 'seki_super_secret_key_99' # Change this if you have actual friends socketio = SocketIO(app, cors_allowed_origins="*") # Auth Setup login_manager = LoginManager(app) login_manager.login_view = 'login' DB_FILE = 'pos_database.db' CACHE_DIR = 'static/cache' os.makedirs(CACHE_DIR, exist_ok=True) # --- MODELS --- class User(UserMixin): def __init__(self, id, username): self.id = id self.username = username # --- DATABASE LOGIC --- def init_db(): with sqlite3.connect(DB_FILE) as conn: conn.execute('''CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, username TEXT UNIQUE, password TEXT)''') conn.execute('''CREATE TABLE IF NOT EXISTS products (barcode TEXT PRIMARY KEY, name TEXT, price REAL, image_url TEXT)''') # Default user: admin / Pass: choripan1234 user = conn.execute('SELECT * FROM users WHERE username = ?', ('admin',)).fetchone() if not user: hashed_pw = generate_password_hash('choripan1234') conn.execute('INSERT INTO users (username, password) VALUES (?, ?)', ('admin', hashed_pw)) conn.commit() @login_manager.user_loader def load_user(user_id): with sqlite3.connect(DB_FILE) as conn: user = conn.execute('SELECT id, username FROM users WHERE id = ?', (user_id,)).fetchone() return User(user[0], user[1]) if user else None # --- HELPERS --- def download_image(url, barcode): if not url or not url.startswith('http'): return url local_filename = f"{barcode}.jpg" local_path = os.path.join(CACHE_DIR, local_filename) if os.path.exists(local_path): return f"/static/cache/{local_filename}" try: headers = {'User-Agent': 'SekiPOS/1.0'} r = requests.get(url, headers=headers, stream=True, timeout=5) if r.status_code == 200: with open(local_path, 'wb') as f: for chunk in r.iter_content(1024): f.write(chunk) return f"/static/cache/{local_filename}" except: pass return url def fetch_from_openfoodfacts(barcode): url = f"https://world.openfoodfacts.org/api/v2/product/{barcode}.json" try: headers = {'User-Agent': 'SekiPOS/1.0'} resp = requests.get(url, headers=headers, timeout=5).json() if resp.get('status') == 1: p = resp.get('product', {}) name = p.get('product_name_es') or p.get('product_name') or p.get('brands', 'Unknown') imgs = p.get('selected_images', {}).get('front', {}).get('display', {}) img_url = imgs.get('es') or imgs.get('en') or p.get('image_url', '') local_img = download_image(img_url, barcode) return {"name": name, "image": local_img} except: pass return None # --- ROUTES --- @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': user_in = request.form.get('username') pass_in = request.form.get('password') with sqlite3.connect(DB_FILE) as conn: user = conn.execute('SELECT * FROM users WHERE username = ?', (user_in,)).fetchone() if user and check_password_hash(user[2], pass_in): login_user(User(user[0], user[1])) return redirect(url_for('index')) flash('Invalid credentials.') return render_template('login.html') @app.route('/logout') @login_required def logout(): logout_user(); return redirect(url_for('login')) @app.route('/') @login_required def index(): with sqlite3.connect(DB_FILE) as conn: products = conn.execute('SELECT * FROM products').fetchall() return render_template('index.html', products=products, user=current_user) @app.route('/upsert', methods=['POST']) @login_required def upsert(): d = request.form with sqlite3.connect(DB_FILE) as conn: conn.execute('''INSERT INTO products (barcode, name, price, image_url) VALUES (?,?,?,?) ON CONFLICT(barcode) DO UPDATE SET name=excluded.name, price=excluded.price, image_url=excluded.image_url''', (d['barcode'], d['name'], d['price'], d['image_url'])) conn.commit() return redirect(url_for('index')) @app.route('/delete/', methods=['POST']) @login_required def delete(barcode): with sqlite3.connect(DB_FILE) as conn: conn.execute('DELETE FROM products WHERE barcode = ?', (barcode,)) conn.commit() # Clean up cache img_p = os.path.join(CACHE_DIR, f"{barcode}.jpg") if os.path.exists(img_p): os.remove(img_p) socketio.emit('product_deleted', {"barcode": barcode}) return redirect(url_for('index')) @app.route('/scan', methods=['GET']) def scan(): barcode = request.args.get('content', '').replace('{content}', '') if not barcode: return jsonify({"err": "empty"}), 400 with sqlite3.connect(DB_FILE) as conn: p = conn.execute('SELECT * FROM products WHERE barcode = ?', (barcode,)).fetchone() if p: socketio.emit('new_scan', {"barcode": p[0], "name": p[1], "price": int(p[2]), "image": p[3]}) return jsonify({"status": "ok"}) else: ext = fetch_from_openfoodfacts(barcode) if ext: socketio.emit('scan_error', {"barcode": barcode, "name": ext['name'], "image": ext['image']}) else: socketio.emit('scan_error', {"barcode": barcode}) return jsonify({"status": "not_found"}), 404 @app.route('/static/cache/') def serve_cache(filename): return send_from_directory(CACHE_DIR, filename) if __name__ == '__main__': init_db() socketio.run(app, host='0.0.0.0', port=5000, debug=True)