Files
SekiPOS/core/sync.py
SekiDesu01 3fb95c1617 modified: blueprints/inventory.py
modified:   blueprints/sync_server.py
	modified:   core/sync.py
2026-06-23 17:05:35 -04:00

363 lines
17 KiB
Python

import threading
import requests
from datetime import datetime, timezone
POLL_INTERVAL = 30
BACKOFF_MULTIPLIER = 2
MAX_BACKOFF = 120
class SyncManager:
def __init__(self, db_file, instance_id, server_url, display_name="Desktop", sync_secret=""):
self.db_file = db_file
self.instance_id = instance_id
self.server_url = server_url.rstrip('/')
self.display_name = display_name
self._headers = {'X-Sync-Secret': sync_secret} if sync_secret else {}
self._stop = threading.Event()
self._last_push_at = None
self._last_sale_pull = None
self._last_debtor_pull = None
self._last_ticket_pull = None
self._last_dicom_pull = None
self._last_expense_pull = None
self._last_deletion_pull = None
def start(self):
thread = threading.Thread(target=self._run, daemon=True, name="sync-manager")
thread.start()
def stop(self):
self._stop.set()
def _run(self):
backoff = POLL_INTERVAL
while not self._stop.wait(backoff):
try:
if self._ping():
self._push()
self._pull()
backoff = POLL_INTERVAL
else:
backoff = min(backoff * BACKOFF_MULTIPLIER, MAX_BACKOFF)
except Exception as e:
print(f"[Sync] Error: {e}")
backoff = min(backoff * BACKOFF_MULTIPLIER, MAX_BACKOFF)
def _ping(self):
try:
r = requests.get(f"{self.server_url}/api/ping", headers=self._headers, timeout=5)
return r.ok
except requests.RequestException:
return False
def _push(self):
import sqlite3
conn = sqlite3.connect(self.db_file)
try:
now = datetime.now(timezone.utc).isoformat()
cutoff = self._last_push_at or '1970-01-01'
# Sales (immutable, synced_at IS NULL)
sales = conn.execute(
"SELECT id, uuid, date, total, payment_method FROM sales WHERE synced_at IS NULL"
).fetchall()
sale_ids = [s[0] for s in sales]
sale_payloads = []
for s in sales:
items = conn.execute(
"SELECT barcode, name, price, quantity, subtotal FROM sale_items WHERE sale_id = ?",
(s[0],)
).fetchall()
sale_payloads.append({
"uuid": s[1], "date": s[2], "total": s[3], "payment_method": s[4],
"items": [{"barcode": i[0], "name": i[1], "price": i[2], "qty": i[3], "subtotal": i[4]} for i in items]
})
# Products (mutable, by updated_at)
products = conn.execute(
"SELECT barcode, name, price, image_url, stock, unit_type, updated_at FROM products WHERE updated_by = ? AND updated_at >= ?",
(self.instance_id, cutoff)
).fetchall()
# Debtors (mutable)
debtors = conn.execute(
"SELECT uuid, name, contact_info, updated_at FROM debtors WHERE updated_by = ? AND updated_at >= ?",
(self.instance_id, cutoff)
).fetchall()
# Dicom legacy (mutable)
dicom_rows = conn.execute(
"SELECT uuid, name, amount, notes, image_url, updated_at FROM dicom WHERE updated_by = ? AND updated_at >= ?",
(self.instance_id, cutoff)
).fetchall()
# Debtor tickets with items (mutable)
tickets_raw = conn.execute(
"SELECT id, uuid, debtor_id, date, total, amount_paid, status, updated_at FROM debtor_tickets WHERE updated_by = ? AND updated_at >= ?",
(self.instance_id, cutoff)
).fetchall()
ticket_payloads = []
for t in tickets_raw:
debtor_uuid = conn.execute("SELECT uuid FROM debtors WHERE id = ?", (t[2],)).fetchone()
items = conn.execute(
"SELECT barcode, name, price, quantity, subtotal FROM debtor_ticket_items WHERE ticket_id = ?",
(t[0],)
).fetchall()
ticket_payloads.append({
"uuid": t[1], "debtor_uuid": debtor_uuid[0] if debtor_uuid else '',
"date": t[3], "total": t[4], "amount_paid": t[5], "status": t[6],
"updated_at": t[7],
"items": [{"barcode": i[0], "name": i[1], "price": i[2], "qty": i[3], "subtotal": i[4]} for i in items]
})
# Expenses (immutable, synced_at IS NULL)
expenses = conn.execute(
"SELECT uuid, date, description, amount FROM expenses WHERE synced_at IS NULL"
).fetchall()
# Deletions (immutable, synced_at IS NULL)
deletions = conn.execute(
"SELECT entity_type, entity_uuid FROM sync_deletions WHERE synced_at IS NULL"
).fetchall()
payload = {
"instance_id": self.instance_id,
"display_name": self.display_name,
"synced_at": now,
"sales": sale_payloads,
"products": [
{"barcode": p[0], "name": p[1], "price": p[2], "image_url": p[3], "stock": p[4], "unit_type": p[5], "updated_at": p[6]}
for p in products
],
"debtors": [
{"uuid": d[0], "name": d[1], "contact_info": d[2], "updated_at": d[3], "updated_by": self.instance_id}
for d in debtors
],
"dicom": [
{"uuid": d[0], "name": d[1], "amount": d[2], "notes": d[3], "image_url": d[4], "updated_at": d[5], "updated_by": self.instance_id}
for d in dicom_rows
],
"tickets": ticket_payloads,
"expenses": [
{"uuid": e[0], "date": e[1], "description": e[2], "amount": e[3]}
for e in expenses
],
"deletions": [
{"entity_type": d[0], "entity_uuid": d[1]}
for d in deletions
],
}
has_data = any([
sale_payloads, products, debtors, dicom_rows,
ticket_payloads, expenses, deletions
])
if has_data:
r = requests.post(f"{self.server_url}/api/sync/push", json=payload, headers=self._headers, timeout=15)
if r.ok:
if sale_ids:
conn.executemany(
"UPDATE sales SET synced_at = ? WHERE id = ?",
[(now, sid) for sid in sale_ids]
)
if deletions:
conn.executemany(
"UPDATE sync_deletions SET synced_at = ? WHERE entity_type = ? AND entity_uuid = ?",
[(now, d[0], d[1]) for d in deletions]
)
conn.commit()
self._last_push_at = now
print(f"[Sync] Pushed {len(sale_payloads)} sales, {len(products)} products, {len(debtors)} debtors, {len(dicom_rows)} dicom, {len(ticket_payloads)} tickets, {len(expenses)} expenses, {len(deletions)} deletions")
else:
print(f"[Sync] Push failed: HTTP {r.status_code} {r.text[:200]}")
else:
print(f"[Sync] Push: nothing to push")
finally:
conn.close()
def _pull(self):
import sqlite3
conn = sqlite3.connect(self.db_file)
try:
last_pull = conn.execute(
"SELECT COALESCE(MAX(updated_at), '1970-01-01') FROM products WHERE updated_by != ?",
(self.instance_id,)
).fetchone()[0]
params = {
"since": last_pull,
"since_sales": self._last_sale_pull or '1970-01-01',
"since_debtors": self._last_debtor_pull or '1970-01-01',
"since_tickets": self._last_ticket_pull or '1970-01-01',
"since_dicom": self._last_dicom_pull or '1970-01-01',
"since_expenses": self._last_expense_pull or '1970-01-01',
"since_deletions": self._last_deletion_pull or '1970-01-01',
"instance_id": self.instance_id,
}
print(f"[Sync] Pull: {params}")
r = requests.get(f"{self.server_url}/api/sync/pull", params=params, headers=self._headers, timeout=15)
if not r.ok:
print(f"[Sync] Pull request failed: HTTP {r.status_code} {r.text[:200]}")
return
data = r.json()
now = datetime.now(timezone.utc).isoformat()
# Apply deletions first
for d in data.get("deletions", []):
et, eu = d["entity_type"], d["entity_uuid"]
if et == 'debtor':
conn.execute("DELETE FROM debtors WHERE uuid = ?", (eu,))
elif et == 'ticket':
conn.execute("DELETE FROM debtor_tickets WHERE uuid = ?", (eu,))
elif et == 'dicom':
conn.execute("DELETE FROM dicom WHERE uuid = ?", (eu,))
elif et == 'expense':
conn.execute("DELETE FROM expenses WHERE uuid = ?", (eu,))
elif et == 'sale':
conn.execute("DELETE FROM sales WHERE uuid = ?", (eu,))
elif et == 'product':
conn.execute("DELETE FROM products WHERE barcode = ?", (eu,))
# Debtors (upsert by uuid)
pulled_debtors = 0
for d in data.get("debtors", []):
conn.execute('''INSERT INTO debtors (uuid, name, contact_info, updated_at, updated_by)
VALUES (?,?,?,?,?)
ON CONFLICT(uuid) DO UPDATE SET
name=excluded.name, contact_info=excluded.contact_info,
updated_at=excluded.updated_at, updated_by=excluded.updated_by''',
(d["uuid"], d["name"], d.get("contact_info", ""),
d.get("updated_at", now), d.get("updated_by", "")))
pulled_debtors += 1
# Dicom (upsert by uuid)
pulled_dicom = 0
for d in data.get("dicom", []):
conn.execute('''INSERT INTO dicom (uuid, name, amount, notes, image_url, updated_at, updated_by)
VALUES (?,?,?,?,?,?,?)
ON CONFLICT(uuid) DO UPDATE SET
name=excluded.name, amount=excluded.amount,
notes=excluded.notes, image_url=excluded.image_url,
updated_at=excluded.updated_at, updated_by=excluded.updated_by''',
(d["uuid"], d["name"], d.get("amount", 0), d.get("notes", ""),
d.get("image_url", ""), d.get("updated_at", now), d.get("updated_by", "")))
pulled_dicom += 1
# Debtor tickets (insert with items, dedup by uuid)
pulled_tickets = 0
for t in data.get("tickets", []):
existing = conn.execute(
"SELECT id FROM debtor_tickets WHERE uuid = ?", (t["uuid"],)
).fetchone()
if existing:
continue
debtor_row = conn.execute(
"SELECT id FROM debtors WHERE uuid = ?", (t["debtor_uuid"],)
).fetchone()
if not debtor_row:
continue
cur = conn.execute(
"INSERT INTO debtor_tickets (uuid, debtor_id, date, total, amount_paid, status, updated_at, updated_by) VALUES (?,?,?,?,?,?,?,?)",
(t["uuid"], debtor_row[0], t.get("date", now), t["total"],
t.get("amount_paid", 0), t.get("status", "unpaid"),
t.get("updated_at", now), t.get("updated_by", ""))
)
for item in t.get("items", []):
conn.execute(
"INSERT INTO debtor_ticket_items (ticket_id, barcode, name, price, quantity, subtotal) VALUES (?,?,?,?,?,?)",
(cur.lastrowid, item["barcode"], item["name"], item.get("price", 0),
item.get("qty", 1), item.get("subtotal", 0))
)
pulled_tickets += 1
# Expenses (insert, dedup by uuid)
pulled_expenses = 0
for e in data.get("expenses", []):
existing = conn.execute(
"SELECT id FROM expenses WHERE uuid = ?", (e["uuid"],)
).fetchone()
if existing:
continue
conn.execute(
"INSERT INTO expenses (uuid, date, description, amount, synced_at) VALUES (?,?,?,?,?)",
(e["uuid"], e.get("date", now), e["description"], e["amount"], now)
)
pulled_expenses += 1
# Products (upsert by barcode, skip if local is newer)
pulled_prods = 0
for p in data.get("products", []):
existing = conn.execute(
"SELECT updated_at FROM products WHERE barcode = ?", (p["barcode"],)
).fetchone()
if existing and existing[0] >= p["updated_at"]:
continue
conn.execute('''INSERT INTO products (barcode, name, price, image_url, stock, unit_type, updated_at, updated_by)
VALUES (?,?,?,?,?,?,?,?)
ON CONFLICT(barcode) DO UPDATE SET
name=excluded.name, price=excluded.price,
image_url=excluded.image_url, stock=excluded.stock,
unit_type=excluded.unit_type, updated_at=excluded.updated_at,
updated_by=excluded.updated_by''',
(p["barcode"], p["name"], p["price"], p.get("image_url"),
p.get("stock", 0), p.get("unit_type", "unit"),
p["updated_at"], p.get("updated_by", "")))
pulled_prods += 1
# Sales (insert with items, dedup by uuid)
pulled_sales = 0
for sale in data.get("sales", []):
existing = conn.execute(
"SELECT id FROM sales WHERE uuid = ?", (sale["uuid"],)
).fetchone()
if existing:
continue
cur = conn.execute(
"INSERT INTO sales (date, total, payment_method, uuid, synced_at) VALUES (?, ?, ?, ?, ?)",
(sale.get("date", now), sale["total"], sale.get("payment_method", "efectivo"),
sale["uuid"], now)
)
for item in sale.get("items", []):
conn.execute(
"INSERT INTO sale_items (sale_id, barcode, name, price, quantity, subtotal) VALUES (?, ?, ?, ?, ?, ?)",
(cur.lastrowid, item["barcode"], item["name"], item.get("price", 0),
item.get("qty", 1), item.get("subtotal", 0))
)
pulled_sales += 1
conn.commit()
# Update pull timestamps
all_sales = data.get("sales", [])
all_debtors = data.get("debtors", [])
all_tickets = data.get("tickets", [])
all_dicom = data.get("dicom", [])
all_expenses = data.get("expenses", [])
all_deletions = data.get("deletions", [])
if all_sales:
self._last_sale_pull = max(s["date"] for s in all_sales)
if all_debtors:
self._last_debtor_pull = max(d["updated_at"] for d in all_debtors)
if all_tickets:
self._last_ticket_pull = max(t["updated_at"] for t in all_tickets)
if all_dicom:
self._last_dicom_pull = max(d["updated_at"] for d in all_dicom)
if all_expenses:
self._last_expense_pull = max(e["date"] for e in all_expenses)
if all_deletions:
self._last_deletion_pull = max(d["deleted_at"] for d in all_deletions)
if pulled_prods or pulled_sales or pulled_debtors or pulled_dicom or pulled_tickets or pulled_expenses:
print(f"[Sync] Pulled {pulled_prods} products, {pulled_sales} sales, {pulled_debtors} debtors, {pulled_dicom} dicom, {pulled_tickets} tickets, {pulled_expenses} expenses")
else:
print(f"[Sync] Pull: nothing new to apply")
except Exception as e:
print(f"[Sync] Pull error: {e}")
finally:
conn.close()