124 lines
4.6 KiB
Python
124 lines
4.6 KiB
Python
import os
|
|
import json
|
|
from flask import Blueprint, render_template, request, redirect, url_for, flash, jsonify, current_app
|
|
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
|
|
from werkzeug.security import check_password_hash, generate_password_hash
|
|
from core.db import get_db_connection
|
|
|
|
auth_bp = Blueprint('auth', __name__)
|
|
|
|
login_manager = LoginManager()
|
|
login_manager.login_view = 'auth.login'
|
|
|
|
class User(UserMixin):
|
|
def __init__(self, id, username):
|
|
self.id = id
|
|
self.username = username
|
|
|
|
@login_manager.user_loader
|
|
def load_user(user_id):
|
|
with get_db_connection() as conn:
|
|
user = conn.execute('SELECT id, username FROM users WHERE id = ?', (user_id,)).fetchone()
|
|
return User(user[0], user[1]) if user else None
|
|
|
|
@auth_bp.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
if request.method == 'POST':
|
|
user_in = request.form.get('username')
|
|
pass_in = request.form.get('password')
|
|
with get_db_connection() as conn:
|
|
user = conn.execute('SELECT * FROM users WHERE username = ?', (user_in,)).fetchone()
|
|
if user and check_password_hash(user[2], pass_in):
|
|
login_user(User(user[0], user[1]))
|
|
return redirect(url_for('inventory.inventory'))
|
|
flash('Invalid credentials.')
|
|
return render_template('login.html')
|
|
|
|
@auth_bp.route('/logout')
|
|
@login_required
|
|
def logout():
|
|
logout_user()
|
|
return redirect(url_for('auth.login'))
|
|
|
|
@auth_bp.route('/settings/update', methods=['POST'])
|
|
@login_required
|
|
def update_settings():
|
|
new_password = request.form.get('password')
|
|
profile_pic = request.form.get('profile_pic')
|
|
|
|
with get_db_connection() as conn:
|
|
if new_password and len(new_password) > 0:
|
|
hashed_pw = generate_password_hash(new_password)
|
|
conn.execute('UPDATE users SET password = ? WHERE id = ?', (hashed_pw, current_user.id))
|
|
|
|
if profile_pic:
|
|
conn.execute('UPDATE users SET profile_pic = ? WHERE id = ?', (profile_pic, current_user.id))
|
|
conn.commit()
|
|
|
|
flash('Configuración actualizada')
|
|
return redirect(request.referrer)
|
|
|
|
@auth_bp.route('/api/instance/config')
|
|
@login_required
|
|
def get_instance_config():
|
|
db_dir = os.path.dirname(current_app.config.get('DB_FILE', ''))
|
|
instance_file = os.path.join(db_dir, 'instance.json')
|
|
if os.path.exists(instance_file):
|
|
with open(instance_file) as f:
|
|
config = json.load(f)
|
|
else:
|
|
config = {}
|
|
return jsonify({
|
|
"instance_id": config.get('instance_id', ''),
|
|
"display_name": config.get('display_name', 'Caja Principal'),
|
|
"server_url": config.get('server_url', ''),
|
|
"sync_secret": config.get('sync_secret', ''),
|
|
"mode": config.get('mode', 'desktop')
|
|
})
|
|
|
|
@auth_bp.route('/api/instance/config', methods=['POST'])
|
|
@login_required
|
|
def save_instance_config():
|
|
data = request.get_json()
|
|
db_dir = os.path.dirname(current_app.config.get('DB_FILE', ''))
|
|
instance_file = os.path.join(db_dir, 'instance.json')
|
|
if os.path.exists(instance_file):
|
|
with open(instance_file) as f:
|
|
config = json.load(f)
|
|
else:
|
|
config = {}
|
|
if 'server_url' in data:
|
|
config['server_url'] = data['server_url']
|
|
if 'display_name' in data:
|
|
config['display_name'] = data['display_name']
|
|
if 'sync_secret' in data:
|
|
config['sync_secret'] = data['sync_secret']
|
|
with open(instance_file, 'w') as f:
|
|
json.dump(config, f, indent=2)
|
|
return jsonify({"status": "ok"})
|
|
|
|
@auth_bp.route('/api/sync/trigger', methods=['POST'])
|
|
@login_required
|
|
def trigger_sync():
|
|
from core.sync import SyncManager
|
|
try:
|
|
db_file = current_app.config.get('DB_FILE', '')
|
|
instance_id = current_app.config.get('INSTANCE_ID', '')
|
|
server_url = current_app.config.get('SERVER_URL', '')
|
|
sync_secret = ''
|
|
if db_file:
|
|
instance_file = os.path.join(os.path.dirname(db_file), 'instance.json')
|
|
if os.path.exists(instance_file):
|
|
with open(instance_file) as f:
|
|
sync_secret = json.load(f).get('sync_secret', '')
|
|
if server_url:
|
|
sm = SyncManager(db_file, instance_id, server_url, current_app.config.get('DISPLAY_NAME', ''), sync_secret)
|
|
sm._push()
|
|
sm._pull()
|
|
return jsonify({"status": "ok"})
|
|
return jsonify({"status": "error", "message": "No server URL configured"}), 400
|
|
except Exception as e:
|
|
return jsonify({"status": "error", "message": str(e)}), 500
|
|
|
|
def init_login_manager(app):
|
|
login_manager.init_app(app) |