moved stuff + scanner com/hid edits

This commit is contained in:
2026-03-17 17:25:14 -03:00
parent 9cf0792866
commit 0fcb8ce473
1228 changed files with 382 additions and 151555 deletions

Binary file not shown.

View File

@@ -0,0 +1,164 @@
import os
import json
import requests
import barcode
import urllib3
import re
from barcode.writer import ImageWriter
from PIL import Image, ImageDraw, ImageFont
from io import BytesIO
# --- SETTINGS ---
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
JSON_FILE = os.path.join(os.getcwd(), 'curated_list.json')
CARD_DIR = os.path.join(os.getcwd(), 'keychain_cards')
IMG_CACHE_DIR = os.path.join(os.getcwd(), 'image_cache')
OUTPUT_PDF = os.path.join(os.getcwd(), 'keychain_3x3_perfect.pdf')
# A4 at 300 DPI
PAGE_W, PAGE_H = 2480, 3508
COLS, ROWS = 3, 3
PAGE_MARGIN = 150
# Ensure directories exist
for d in [CARD_DIR, IMG_CACHE_DIR]:
os.makedirs(d, exist_ok=True)
def clean_filename(name):
return re.sub(r'[\\/*?:"<>|]', "_", name)
def get_ean_from_plu(plu):
return f"000000{str(plu).zfill(4)}00"
def get_cached_image(url, plu):
cache_path = os.path.join(IMG_CACHE_DIR, f"{plu}.jpg")
# Si el archivo ya existe en el cache, lo usamos sin importar la URL
if os.path.exists(cache_path):
return cache_path
# Si no existe y la URL es un placeholder, no podemos descargar nada
if url == "URL_PLACEHOLDER":
print(f"⚠️ {plu} tiene placeholder y no se encontró en {IMG_CACHE_DIR}")
return None
# Lógica de descarga original para URLs reales
try:
headers = {'User-Agent': 'Mozilla/5.0'}
res = requests.get(url, headers=headers, timeout=10, verify=False)
if res.status_code == 200:
with open(cache_path, 'wb') as f:
f.write(res.content)
return cache_path
except Exception as e:
print(f"❌ Error descargando {plu}: {e}")
return None
def generate_card(item):
name = item['name']
plu = item['plu']
img_url = item['image']
safe_name = clean_filename(name).replace(' ', '_')
final_path = os.path.join(CARD_DIR, f"PLU_{plu}_{safe_name}.png")
if os.path.exists(final_path):
return final_path
local_img_path = get_cached_image(img_url, plu)
card = Image.new('RGB', (300, 450), color='white')
draw = ImageDraw.Draw(card)
draw.rectangle([0, 0, 299, 449], outline="black", width=3)
if local_img_path:
try:
img = Image.open(local_img_path).convert("RGB")
w, h = img.size
size = min(w, h)
img = img.crop(((w-size)//2, (h-size)//2, (w+size)//2, (h+size)//2))
img = img.resize((200, 200), Image.Resampling.LANCZOS)
card.paste(img, (50, 40))
except:
draw.text((150, 140), "[IMG ERROR]", anchor="mm", fill="red")
else:
draw.text((150, 140), "[NOT FOUND]", anchor="mm", fill="red")
try:
f_name = ImageFont.truetype("arialbd.ttf", 22)
f_plu = ImageFont.truetype("arial.ttf", 18)
except:
f_name = f_plu = ImageFont.load_default()
draw.text((150, 260), name.upper(), fill="black", font=f_name, anchor="mm")
draw.text((150, 295), f"PLU: {plu}", fill="#333333", font=f_plu, anchor="mm")
EAN = barcode.get_barcode_class('ean13')
ean = EAN(get_ean_from_plu(plu), writer=ImageWriter())
tmp = f"tmp_{plu}"
ean.save(tmp, options={'module_height': 12.0, 'font_size': 10, 'text_distance': 4})
if os.path.exists(f"{tmp}.png"):
b_img = Image.open(f"{tmp}.png")
b_img = b_img.resize((280, 120))
card.paste(b_img, (10, 320))
os.remove(f"{tmp}.png")
card.save(final_path)
print(f" - Card created: {name} ({plu})")
return final_path
def create_pdf():
all_files = sorted([f for f in os.listdir(CARD_DIR) if f.endswith('.png')])
if not all_files:
print("❌ No cards found to put in PDF.")
return
available_w = PAGE_W - (PAGE_MARGIN * 2)
available_h = PAGE_H - (PAGE_MARGIN * 2)
slot_w, slot_h = available_w // COLS, available_h // ROWS
target_w = int(slot_w * 0.9)
target_h = int(target_w * (450 / 300))
if target_h > (slot_h * 0.9):
target_h = int(slot_h * 0.9)
target_w = int(target_h * (300 / 450))
pages = []
current_page = Image.new('RGB', (PAGE_W, PAGE_H), 'white')
print(f"📄 Organizing {len(all_files)} cards into {COLS}x{ROWS} grid...")
for i, filename in enumerate(all_files):
item_idx = i % (COLS * ROWS)
if item_idx == 0 and i > 0:
pages.append(current_page)
current_page = Image.new('RGB', (PAGE_W, PAGE_H), 'white')
row, col = item_idx // COLS, item_idx % COLS
img_path = os.path.join(CARD_DIR, filename)
card_img = Image.open(img_path).convert('RGB')
card_img = card_img.resize((target_w, target_h), Image.Resampling.LANCZOS)
x = PAGE_MARGIN + (col * slot_w) + (slot_w - target_w) // 2
y = PAGE_MARGIN + (row * slot_h) + (slot_h - target_h) // 2
current_page.paste(card_img, (x, y))
pages.append(current_page)
pages[0].save(OUTPUT_PDF, save_all=True, append_images=pages[1:], resolution=300.0, quality=100)
print(f"✅ Created {OUTPUT_PDF}. Now go print it and stop crying.")
if __name__ == "__main__":
if not os.path.exists(JSON_FILE):
print(f"❌ Missing {JSON_FILE}")
else:
with open(JSON_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
print(f"Step 1: Processing {len(data)} cards...")
for entry in data:
generate_card(entry)
print("\nStep 2: Generating PDF...")
create_pdf()

View File

@@ -0,0 +1,51 @@
import pandas as pd
import json
import os
file_path = os.path.join(os.getcwd(), 'PLU+FSMA+list+v1.0.xlsx')
sheet_name = 'Non FTL'
new_url_base = "https://server-ifps.accurateig.com/assets/commodities/"
def get_one_of_each():
if not os.path.exists(file_path):
print("❌ Excel file not found.")
return
# 1. Load Excel
df = pd.read_excel(file_path, sheet_name=sheet_name)
# 2. Drop rows missing the essentials
df = df.dropna(subset=['IMAGE', 'PLU', 'COMMODITY'])
# 3. CRITICAL: Drop duplicates by COMMODITY only
# This ignores Variety and Size, giving us exactly one row per fruit type.
df_unique = df.drop_duplicates(subset=['COMMODITY'], keep='first')
data_output = []
for _, row in df_unique.iterrows():
# Extract filename from the messy URL in Excel
original_link = str(row['IMAGE'])
filename = original_link.split('/')[-1]
# Build the final working URL
image_url = f"{new_url_base}{filename}"
# Get the clean Commodity name
commodity = str(row['COMMODITY']).title()
plu_code = str(row['PLU'])
data_output.append({
"name": commodity,
"plu": plu_code,
"image": image_url
})
# 4. Save to JSON
with open('one_of_each.json', 'w', encoding='utf-8') as f:
json.dump(data_output, f, indent=4, ensure_ascii=False)
print(f"✅ Success! Generated 'one_of_each.json' with {len(data_output)} unique commodities.")
if __name__ == "__main__":
get_one_of_each()

View File

@@ -0,0 +1,41 @@
import serial
import requests
import time
# --- CONFIGURATION ---
COM_PORT = 'COM5' # Change to /dev/ttyUSB0 on Linux
BAUD_RATE = 115200
# The IP of the PC running your Flask WebUI
SERVER_URL = "https://scanner.sekidesu.xyz/scan" # Change to your server's URL
def run_bridge():
try:
# Initialize serial connection
ser = serial.Serial(COM_PORT, BAUD_RATE, timeout=0.1)
print(f"Connected to {COM_PORT} at {BAUD_RATE} bauds.")
print("Ready to scan. Try not to break it.")
while True:
# Read line from scanner (most scanners send \r or \n at the end)
if ser.in_waiting > 0:
barcode = ser.readline().decode('utf-8').strip()
if barcode:
print(f"Scanned: {barcode}")
try:
# Send to your existing Flask server
# We use the same parameter 'content' so your server doesn't know the difference
resp = requests.get(SERVER_URL, params={'content': barcode})
print(f"Server responded: {resp.status_code}")
except Exception as e:
print(f"Failed to send to server: {e}")
time.sleep(0.01) # Don't melt your CPU
except serial.SerialException as e:
print(f"Error opening {COM_PORT}: {e}")
except KeyboardInterrupt:
print("\nBridge stopped by user. Quitter.")
if __name__ == "__main__":
run_bridge()

View File

@@ -0,0 +1,36 @@
import serial
import requests
import time
import argparse
import sys
def run_bridge():
parser = argparse.ArgumentParser(description="Scanner Bridge for the technically impaired")
parser.add_argument('--port', default='COM5', help='Serial port (default: COM5)')
parser.add_argument('--baud', type=int, default=115200, help='Baud rate (default: 115200)')
parser.add_argument('--url', default='https://scanner.sekidesu.xyz/scan', help='Server URL')
args = parser.parse_args()
try:
ser = serial.Serial(args.port, args.baud, timeout=0.1)
print(f"Connected to {args.port} at {args.baud} bauds.")
while True:
if ser.in_waiting > 0:
barcode = ser.readline().decode('utf-8', errors='ignore').strip()
if barcode:
print(f"Scanned: {barcode}")
try:
resp = requests.get(args.url, params={'content': barcode}, timeout=5)
print(f"Server responded: {resp.status_code}")
except Exception as e:
print(f"Failed to send to server: {e}")
time.sleep(0.01)
except serial.SerialException as e:
print(f"Error opening {args.port}: {e}")
except KeyboardInterrupt:
print("\nBridge stopped. Finally.")
if __name__ == "__main__":
run_bridge()

View File

@@ -0,0 +1,155 @@
import tkinter as tk
from tkinter import ttk
import threading
import requests
import usb.core
import usb.util
import usb.backend.libusb1
import os
import time
VENDOR_ID = 0xFFFF
PRODUCT_ID = 0x0035
HID_MAP = {
4: 'a', 5: 'b', 6: 'c', 7: 'd', 8: 'e', 9: 'f', 10: 'g', 11: 'h', 12: 'i',
13: 'j', 14: 'k', 15: 'l', 16: 'm', 17: 'n', 18: 'o', 19: 'p', 20: 'q',
21: 'r', 22: 's', 23: 't', 24: 'u', 25: 'v', 26: 'w', 27: 'x', 28: 'y', 29: 'z',
30: '1', 31: '2', 32: '3', 33: '4', 34: '5', 35: '6', 36: '7', 37: '8', 38: '9', 39: '0',
44: ' ', 45: '-', 46: '=', 55: '.', 56: '/'
}
class POSBridgeApp:
def __init__(self, root):
self.root = root
self.root.title("POS Hardware Bridge")
self.root.geometry("500x320")
self.running = True
# UI Setup
ttk.Label(root, text="Target POS Endpoint:").pack(pady=(15, 2))
self.url_var = tk.StringVar(value="https://scanner.sekidesu.xyz/scan")
self.url_entry = ttk.Entry(root, textvariable=self.url_var, width=60)
self.url_entry.pack(pady=5)
self.status_var = tk.StringVar(value="Status: Booting...")
self.status_label = ttk.Label(root, textvariable=self.status_var, font=("Segoe UI", 10, "bold"))
self.status_label.pack(pady=10)
ttk.Label(root, text="Activity Log:").pack()
self.log_listbox = tk.Listbox(root, width=70, height=8, bg="#1e1e1e", fg="#00ff00", font=("Consolas", 9))
self.log_listbox.pack(pady=5, padx=10)
# Bind the close button to kill threads cleanly
self.root.protocol("WM_DELETE_WINDOW", self.on_close)
# Fire up the USB listener in a background thread
self.usb_thread = threading.Thread(target=self.usb_listen_loop, daemon=True)
self.usb_thread.start()
def log(self, message):
# Tkinter requires GUI updates to happen on the main thread
self.root.after(0, self._append_log, message)
def _append_log(self, message):
self.log_listbox.insert(0, time.strftime("[%H:%M:%S] ") + message)
if self.log_listbox.size() > 15:
self.log_listbox.delete(15)
def update_status(self, text, color="black"):
self.root.after(0, self._set_status, text, color)
def _set_status(self, text, color):
self.status_var.set(f"Status: {text}")
self.status_label.config(foreground=color)
def on_close(self):
self.running = False
self.root.destroy()
def send_to_pos(self, barcode):
url = self.url_var.get()
self.log(f"Captured: {barcode}. Sending...")
try:
resp = requests.get(url, params={'content': barcode}, timeout=3)
self.log(f"Success: POS returned {resp.status_code}")
except requests.RequestException as e:
self.log(f"HTTP Error: Backend unreachable")
def usb_listen_loop(self):
import sys
# PyInstaller extracts files to a temp _MEIPASS folder at runtime
if getattr(sys, 'frozen', False):
base_path = sys._MEIPASS
else:
base_path = os.path.dirname(os.path.abspath(__file__))
dll_path = os.path.join(base_path, "libusb-1.0.dll")
if not os.path.exists(dll_path):
self.update_status(f"CRITICAL: DLL missing at {dll_path}", "red")
return
backend = usb.backend.libusb1.get_backend(find_library=lambda x: dll_path)
while self.running:
# Reconnect loop
dev = usb.core.find(idVendor=VENDOR_ID, idProduct=PRODUCT_ID, backend=backend)
if dev is None:
self.update_status("Scanner unplugged. Waiting...", "red")
time.sleep(2)
continue
try:
dev.set_configuration()
cfg = dev.get_active_configuration()
intf = cfg[(0,0)]
endpoint = usb.util.find_descriptor(
intf,
custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_IN
)
self.update_status("Scanner Locked & Ready", "green")
current_barcode = ""
# Active reading loop
while self.running:
try:
data = dev.read(endpoint.bEndpointAddress, endpoint.wMaxPacketSize, timeout=1000)
keycode = data[2]
modifier = data[0]
is_shift = modifier == 2 or modifier == 32
if keycode == 0:
continue
if keycode == 40: # Enter key signifies end of scan
if current_barcode:
# Spawn a micro-thread for the HTTP request so we don't block the next scan
threading.Thread(target=self.send_to_pos, args=(current_barcode,), daemon=True).start()
current_barcode = ""
elif keycode in HID_MAP:
char = HID_MAP[keycode]
if is_shift and char.isalpha():
char = char.upper()
current_barcode += char
except usb.core.USBError as e:
# 10060/110 are normal timeouts when no barcode is being actively scanned
if e.args[0] in (10060, 110):
continue
else:
self.log(f"Hardware interrupt lost. Reconnecting...")
break # Breaks inner loop to trigger outer reconnect loop
except Exception as e:
self.log(f"USB Error: {e}")
time.sleep(2) # Prevent rapid crash loops
if __name__ == '__main__':
# You must run pip install requests if you haven't already
root = tk.Tk()
app = POSBridgeApp(root)
root.mainloop()

Binary file not shown.

View File

@@ -0,0 +1,25 @@
import usb.core
import usb.backend.libusb1
import os
# Grab the exact path to the DLL in your current folder
current_dir = os.path.dirname(os.path.abspath(__file__))
dll_path = os.path.join(current_dir, "libusb-1.0.dll")
if not os.path.exists(dll_path):
print(f"I don't see the DLL at: {dll_path}")
exit(1)
# Force pyusb to use this specific file
backend = usb.backend.libusb1.get_backend(find_library=lambda x: dll_path)
print("Scanning with forced local DLL backend...")
devices = usb.core.find(find_all=True, backend=backend)
found = False
for d in devices:
found = True
print(f"Found Device -> VID: {hex(d.idVendor)} PID: {hex(d.idProduct)}")
if not found:
print("Python is still blind. The DLL might be the wrong architecture (32-bit vs 64-bit).")

View File

@@ -0,0 +1,85 @@
import usb.core
import usb.util
import usb.backend.libusb1
import os
import sys
# Your exact scanner IDs
VENDOR_ID = 0xFFFF
PRODUCT_ID = 0x0035
# Basic HID to ASCII translation dictionary
HID_MAP = {
4: 'a', 5: 'b', 6: 'c', 7: 'd', 8: 'e', 9: 'f', 10: 'g', 11: 'h', 12: 'i',
13: 'j', 14: 'k', 15: 'l', 16: 'm', 17: 'n', 18: 'o', 19: 'p', 20: 'q',
21: 'r', 22: 's', 23: 't', 24: 'u', 25: 'v', 26: 'w', 27: 'x', 28: 'y', 29: 'z',
30: '1', 31: '2', 32: '3', 33: '4', 34: '5', 35: '6', 36: '7', 37: '8', 38: '9', 39: '0',
44: ' ', 45: '-', 46: '=', 55: '.', 56: '/'
}
def main():
# Force the local DLL backend
current_dir = os.path.dirname(os.path.abspath(__file__))
dll_path = os.path.join(current_dir, "libusb-1.0.dll")
if not os.path.exists(dll_path):
print(f"Error: Missing {dll_path}")
sys.exit(1)
backend = usb.backend.libusb1.get_backend(find_library=lambda x: dll_path)
# Find the scanner using the forced backend
dev = usb.core.find(idVendor=VENDOR_ID, idProduct=PRODUCT_ID, backend=backend)
if dev is None:
print("Scanner not found. Check Zadig driver again.")
sys.exit(1)
# Claim device
dev.set_configuration()
cfg = dev.get_active_configuration()
intf = cfg[(0,0)]
endpoint = usb.util.find_descriptor(
intf,
custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_IN
)
print("Scanner locked. Waiting for barcodes...")
current_barcode = ""
while True:
try:
# Read 8 bytes from the scanner
data = dev.read(endpoint.bEndpointAddress, endpoint.wMaxPacketSize, timeout=1000)
keycode = data[2]
modifier = data[0]
is_shift = modifier == 2 or modifier == 32
if keycode == 0:
continue
if keycode == 40: # Enter key
print(f"Captured Barcode: {current_barcode}")
current_barcode = ""
elif keycode in HID_MAP:
char = HID_MAP[keycode]
if is_shift and char.isalpha():
char = char.upper()
current_barcode += char
except usb.core.USBError as e:
if e.args[0] == 10060 or e.args[0] == 110:
continue
else:
print(f"USB Error: {e}")
break
except KeyboardInterrupt:
print("\nExiting and releasing scanner...")
usb.util.dispose_resources(dev)
break
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,29 @@
import sqlite3
DB_FILE = 'db/pos_database.db'
def upgrade_db():
try:
with sqlite3.connect(DB_FILE) as conn:
<<<<<<< HEAD
=======
# Add stock column
# conn.execute("ALTER TABLE products ADD COLUMN stock REAL DEFAULT 0")
# print("Successfully added 'stock' column.")
# # App.py also expects unit_type, adding it to prevent future headaches
# conn.execute("ALTER TABLE products ADD COLUMN unit_type TEXT DEFAULT 'unit'")
# print("Successfully added 'unit_type' column.")
>>>>>>> 1a048a0e074ee26bd45dda9731c78c2ecef42fba
conn.execute("ALTER TABLE dicom ADD COLUMN image_url TEXT;")
print("Successfully added 'image_url' column.")
conn.commit()
print("Migration complete. Your data is intact.")
except sqlite3.OperationalError as e:
print(f"Skipped: {e}. (This usually means the columns already exist, so you're fine).")
if __name__ == '__main__':
upgrade_db()