369 lines
17 KiB
Python
369 lines
17 KiB
Python
import threading
|
|
import requests
|
|
from datetime import datetime, timezone
|
|
|
|
POLL_INTERVAL = 30
|
|
BACKOFF_MULTIPLIER = 2
|
|
MAX_BACKOFF = 120
|
|
|
|
class SyncManager:
|
|
def __init__(self, db_file, instance_id, server_url, display_name="Desktop", sync_secret=""):
|
|
self.db_file = db_file
|
|
self.instance_id = instance_id
|
|
self.server_url = server_url.rstrip('/')
|
|
self.display_name = display_name
|
|
self._headers = {'X-Sync-Secret': sync_secret} if sync_secret else {}
|
|
self._stop = threading.Event()
|
|
self._last_push_at = None
|
|
self._last_sale_pull = None
|
|
self._last_debtor_pull = None
|
|
self._last_ticket_pull = None
|
|
self._last_dicom_pull = None
|
|
self._last_expense_pull = None
|
|
self._last_deletion_pull = None
|
|
|
|
def start(self):
|
|
thread = threading.Thread(target=self._run, daemon=True, name="sync-manager")
|
|
thread.start()
|
|
|
|
def stop(self):
|
|
self._stop.set()
|
|
|
|
def _run(self):
|
|
backoff = POLL_INTERVAL
|
|
while not self._stop.wait(backoff):
|
|
try:
|
|
if self._ping():
|
|
self._push()
|
|
self._pull()
|
|
backoff = POLL_INTERVAL
|
|
else:
|
|
backoff = min(backoff * BACKOFF_MULTIPLIER, MAX_BACKOFF)
|
|
except Exception as e:
|
|
print(f"[Sync] Error: {e}")
|
|
backoff = min(backoff * BACKOFF_MULTIPLIER, MAX_BACKOFF)
|
|
|
|
def _ping(self):
|
|
try:
|
|
r = requests.get(f"{self.server_url}/api/ping", headers=self._headers, timeout=5)
|
|
return r.ok
|
|
except requests.RequestException:
|
|
return False
|
|
|
|
def _push(self):
|
|
import sqlite3
|
|
conn = sqlite3.connect(self.db_file)
|
|
try:
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
cutoff = self._last_push_at or '1970-01-01'
|
|
|
|
# Sales (immutable, synced_at IS NULL)
|
|
sales = conn.execute(
|
|
"SELECT id, uuid, date, total, payment_method FROM sales WHERE synced_at IS NULL"
|
|
).fetchall()
|
|
|
|
sale_ids = [s[0] for s in sales]
|
|
sale_payloads = []
|
|
for s in sales:
|
|
items = conn.execute(
|
|
"SELECT barcode, name, price, quantity, subtotal FROM sale_items WHERE sale_id = ?",
|
|
(s[0],)
|
|
).fetchall()
|
|
sale_payloads.append({
|
|
"uuid": s[1], "date": s[2], "total": s[3], "payment_method": s[4],
|
|
"items": [{"barcode": i[0], "name": i[1], "price": i[2], "qty": i[3], "subtotal": i[4]} for i in items]
|
|
})
|
|
|
|
# Products (mutable, by updated_at)
|
|
products = conn.execute(
|
|
"SELECT barcode, name, price, image_url, stock, unit_type, updated_at FROM products WHERE updated_by = ? AND updated_at >= ?",
|
|
(self.instance_id, cutoff)
|
|
).fetchall()
|
|
|
|
# Debtors (mutable)
|
|
debtors = conn.execute(
|
|
"SELECT uuid, name, contact_info, updated_at FROM debtors WHERE updated_by = ? AND updated_at >= ?",
|
|
(self.instance_id, cutoff)
|
|
).fetchall()
|
|
|
|
# Dicom legacy (mutable)
|
|
dicom_rows = conn.execute(
|
|
"SELECT uuid, name, amount, notes, image_url, updated_at FROM dicom WHERE updated_by = ? AND updated_at >= ?",
|
|
(self.instance_id, cutoff)
|
|
).fetchall()
|
|
|
|
# Debtor tickets with items (mutable)
|
|
tickets_raw = conn.execute(
|
|
"SELECT id, uuid, debtor_id, date, total, amount_paid, status, updated_at FROM debtor_tickets WHERE updated_by = ? AND updated_at >= ?",
|
|
(self.instance_id, cutoff)
|
|
).fetchall()
|
|
ticket_payloads = []
|
|
for t in tickets_raw:
|
|
debtor_uuid = conn.execute("SELECT uuid FROM debtors WHERE id = ?", (t[2],)).fetchone()
|
|
items = conn.execute(
|
|
"SELECT barcode, name, price, quantity, subtotal FROM debtor_ticket_items WHERE ticket_id = ?",
|
|
(t[0],)
|
|
).fetchall()
|
|
ticket_payloads.append({
|
|
"uuid": t[1], "debtor_uuid": debtor_uuid[0] if debtor_uuid else '',
|
|
"date": t[3], "total": t[4], "amount_paid": t[5], "status": t[6],
|
|
"updated_at": t[7],
|
|
"items": [{"barcode": i[0], "name": i[1], "price": i[2], "qty": i[3], "subtotal": i[4]} for i in items]
|
|
})
|
|
|
|
# Expenses (immutable, synced_at IS NULL)
|
|
expenses = conn.execute(
|
|
"SELECT id, uuid, date, description, amount FROM expenses WHERE synced_at IS NULL"
|
|
).fetchall()
|
|
expense_ids = [e[0] for e in expenses]
|
|
|
|
# Deletions (immutable, synced_at IS NULL)
|
|
deletions = conn.execute(
|
|
"SELECT entity_type, entity_uuid FROM sync_deletions WHERE synced_at IS NULL"
|
|
).fetchall()
|
|
|
|
payload = {
|
|
"instance_id": self.instance_id,
|
|
"display_name": self.display_name,
|
|
"synced_at": now,
|
|
"sales": sale_payloads,
|
|
"products": [
|
|
{"barcode": p[0], "name": p[1], "price": p[2], "image_url": p[3], "stock": p[4], "unit_type": p[5], "updated_at": p[6]}
|
|
for p in products
|
|
],
|
|
"debtors": [
|
|
{"uuid": d[0], "name": d[1], "contact_info": d[2], "updated_at": d[3], "updated_by": self.instance_id}
|
|
for d in debtors
|
|
],
|
|
"dicom": [
|
|
{"uuid": d[0], "name": d[1], "amount": d[2], "notes": d[3], "image_url": d[4], "updated_at": d[5], "updated_by": self.instance_id}
|
|
for d in dicom_rows
|
|
],
|
|
"tickets": ticket_payloads,
|
|
"expenses": [
|
|
{"uuid": e[1], "date": e[2], "description": e[3], "amount": e[4]}
|
|
for e in expenses
|
|
],
|
|
"deletions": [
|
|
{"entity_type": d[0], "entity_uuid": d[1]}
|
|
for d in deletions
|
|
],
|
|
}
|
|
|
|
has_data = any([
|
|
sale_payloads, products, debtors, dicom_rows,
|
|
ticket_payloads, expenses, deletions
|
|
])
|
|
|
|
if has_data:
|
|
r = requests.post(f"{self.server_url}/api/sync/push", json=payload, headers=self._headers, timeout=15)
|
|
if r.ok:
|
|
if sale_ids:
|
|
conn.executemany(
|
|
"UPDATE sales SET synced_at = ? WHERE id = ?",
|
|
[(now, sid) for sid in sale_ids]
|
|
)
|
|
if expense_ids:
|
|
conn.executemany(
|
|
"UPDATE expenses SET synced_at = ? WHERE id = ?",
|
|
[(now, eid) for eid in expense_ids]
|
|
)
|
|
if deletions:
|
|
conn.executemany(
|
|
"UPDATE sync_deletions SET synced_at = ? WHERE entity_type = ? AND entity_uuid = ?",
|
|
[(now, d[0], d[1]) for d in deletions]
|
|
)
|
|
conn.commit()
|
|
self._last_push_at = now
|
|
print(f"[Sync] Pushed {len(sale_payloads)} sales, {len(products)} products, {len(debtors)} debtors, {len(dicom_rows)} dicom, {len(ticket_payloads)} tickets, {len(expenses)} expenses, {len(deletions)} deletions")
|
|
else:
|
|
print(f"[Sync] Push failed: HTTP {r.status_code} {r.text[:200]}")
|
|
else:
|
|
print(f"[Sync] Push: nothing to push")
|
|
finally:
|
|
conn.close()
|
|
|
|
def _pull(self):
|
|
import sqlite3
|
|
conn = sqlite3.connect(self.db_file)
|
|
try:
|
|
last_pull = conn.execute(
|
|
"SELECT COALESCE(MAX(updated_at), '1970-01-01') FROM products WHERE updated_by != ?",
|
|
(self.instance_id,)
|
|
).fetchone()[0]
|
|
|
|
params = {
|
|
"since": last_pull,
|
|
"since_sales": self._last_sale_pull or '1970-01-01',
|
|
"since_debtors": self._last_debtor_pull or '1970-01-01',
|
|
"since_tickets": self._last_ticket_pull or '1970-01-01',
|
|
"since_dicom": self._last_dicom_pull or '1970-01-01',
|
|
"since_expenses": self._last_expense_pull or '1970-01-01',
|
|
"since_deletions": self._last_deletion_pull or '1970-01-01',
|
|
"instance_id": self.instance_id,
|
|
}
|
|
print(f"[Sync] Pull: {params}")
|
|
|
|
r = requests.get(f"{self.server_url}/api/sync/pull", params=params, headers=self._headers, timeout=15)
|
|
if not r.ok:
|
|
print(f"[Sync] Pull request failed: HTTP {r.status_code} {r.text[:200]}")
|
|
return
|
|
|
|
data = r.json()
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
|
|
# Apply deletions first
|
|
for d in data.get("deletions", []):
|
|
et, eu = d["entity_type"], d["entity_uuid"]
|
|
if et == 'debtor':
|
|
conn.execute("DELETE FROM debtors WHERE uuid = ?", (eu,))
|
|
elif et == 'ticket':
|
|
conn.execute("DELETE FROM debtor_tickets WHERE uuid = ?", (eu,))
|
|
elif et == 'dicom':
|
|
conn.execute("DELETE FROM dicom WHERE uuid = ?", (eu,))
|
|
elif et == 'expense':
|
|
conn.execute("DELETE FROM expenses WHERE uuid = ?", (eu,))
|
|
elif et == 'sale':
|
|
conn.execute("DELETE FROM sales WHERE uuid = ?", (eu,))
|
|
elif et == 'product':
|
|
conn.execute("DELETE FROM products WHERE barcode = ?", (eu,))
|
|
|
|
# Debtors (upsert by uuid)
|
|
pulled_debtors = 0
|
|
for d in data.get("debtors", []):
|
|
conn.execute('''INSERT INTO debtors (uuid, name, contact_info, updated_at, updated_by)
|
|
VALUES (?,?,?,?,?)
|
|
ON CONFLICT(uuid) DO UPDATE SET
|
|
name=excluded.name, contact_info=excluded.contact_info,
|
|
updated_at=excluded.updated_at, updated_by=excluded.updated_by''',
|
|
(d["uuid"], d["name"], d.get("contact_info", ""),
|
|
d.get("updated_at", now), d.get("updated_by", "")))
|
|
pulled_debtors += 1
|
|
|
|
# Dicom (upsert by uuid)
|
|
pulled_dicom = 0
|
|
for d in data.get("dicom", []):
|
|
conn.execute('''INSERT INTO dicom (uuid, name, amount, notes, image_url, updated_at, updated_by)
|
|
VALUES (?,?,?,?,?,?,?)
|
|
ON CONFLICT(uuid) DO UPDATE SET
|
|
name=excluded.name, amount=excluded.amount,
|
|
notes=excluded.notes, image_url=excluded.image_url,
|
|
updated_at=excluded.updated_at, updated_by=excluded.updated_by''',
|
|
(d["uuid"], d["name"], d.get("amount", 0), d.get("notes", ""),
|
|
d.get("image_url", ""), d.get("updated_at", now), d.get("updated_by", "")))
|
|
pulled_dicom += 1
|
|
|
|
# Debtor tickets (insert with items, dedup by uuid)
|
|
pulled_tickets = 0
|
|
for t in data.get("tickets", []):
|
|
existing = conn.execute(
|
|
"SELECT id FROM debtor_tickets WHERE uuid = ?", (t["uuid"],)
|
|
).fetchone()
|
|
if existing:
|
|
continue
|
|
debtor_row = conn.execute(
|
|
"SELECT id FROM debtors WHERE uuid = ?", (t["debtor_uuid"],)
|
|
).fetchone()
|
|
if not debtor_row:
|
|
continue
|
|
cur = conn.execute(
|
|
"INSERT INTO debtor_tickets (uuid, debtor_id, date, total, amount_paid, status, updated_at, updated_by) VALUES (?,?,?,?,?,?,?,?)",
|
|
(t["uuid"], debtor_row[0], t.get("date", now), t["total"],
|
|
t.get("amount_paid", 0), t.get("status", "unpaid"),
|
|
t.get("updated_at", now), t.get("updated_by", ""))
|
|
)
|
|
for item in t.get("items", []):
|
|
conn.execute(
|
|
"INSERT INTO debtor_ticket_items (ticket_id, barcode, name, price, quantity, subtotal) VALUES (?,?,?,?,?,?)",
|
|
(cur.lastrowid, item["barcode"], item["name"], item.get("price", 0),
|
|
item.get("qty", 1), item.get("subtotal", 0))
|
|
)
|
|
pulled_tickets += 1
|
|
|
|
# Expenses (insert, dedup by uuid)
|
|
pulled_expenses = 0
|
|
for e in data.get("expenses", []):
|
|
existing = conn.execute(
|
|
"SELECT id FROM expenses WHERE uuid = ?", (e["uuid"],)
|
|
).fetchone()
|
|
if existing:
|
|
continue
|
|
conn.execute(
|
|
"INSERT INTO expenses (uuid, date, description, amount, synced_at) VALUES (?,?,?,?,?)",
|
|
(e["uuid"], e.get("date", now), e["description"], e["amount"], now)
|
|
)
|
|
pulled_expenses += 1
|
|
|
|
# Products (upsert by barcode, skip if local is newer)
|
|
pulled_prods = 0
|
|
for p in data.get("products", []):
|
|
existing = conn.execute(
|
|
"SELECT updated_at FROM products WHERE barcode = ?", (p["barcode"],)
|
|
).fetchone()
|
|
if existing and existing[0] >= p["updated_at"]:
|
|
continue
|
|
conn.execute('''INSERT INTO products (barcode, name, price, image_url, stock, unit_type, updated_at, updated_by)
|
|
VALUES (?,?,?,?,?,?,?,?)
|
|
ON CONFLICT(barcode) DO UPDATE SET
|
|
name=excluded.name, price=excluded.price,
|
|
image_url=excluded.image_url, stock=excluded.stock,
|
|
unit_type=excluded.unit_type, updated_at=excluded.updated_at,
|
|
updated_by=excluded.updated_by''',
|
|
(p["barcode"], p["name"], p["price"], p.get("image_url"),
|
|
p.get("stock", 0), p.get("unit_type", "unit"),
|
|
p["updated_at"], p.get("updated_by", "")))
|
|
pulled_prods += 1
|
|
|
|
# Sales (insert with items, dedup by uuid)
|
|
pulled_sales = 0
|
|
for sale in data.get("sales", []):
|
|
existing = conn.execute(
|
|
"SELECT id FROM sales WHERE uuid = ?", (sale["uuid"],)
|
|
).fetchone()
|
|
if existing:
|
|
continue
|
|
cur = conn.execute(
|
|
"INSERT INTO sales (date, total, payment_method, uuid, synced_at) VALUES (?, ?, ?, ?, ?)",
|
|
(sale.get("date", now), sale["total"], sale.get("payment_method", "efectivo"),
|
|
sale["uuid"], now)
|
|
)
|
|
for item in sale.get("items", []):
|
|
conn.execute(
|
|
"INSERT INTO sale_items (sale_id, barcode, name, price, quantity, subtotal) VALUES (?, ?, ?, ?, ?, ?)",
|
|
(cur.lastrowid, item["barcode"], item["name"], item.get("price", 0),
|
|
item.get("qty", 1), item.get("subtotal", 0))
|
|
)
|
|
pulled_sales += 1
|
|
|
|
conn.commit()
|
|
|
|
# Update pull timestamps
|
|
all_sales = data.get("sales", [])
|
|
all_debtors = data.get("debtors", [])
|
|
all_tickets = data.get("tickets", [])
|
|
all_dicom = data.get("dicom", [])
|
|
all_expenses = data.get("expenses", [])
|
|
all_deletions = data.get("deletions", [])
|
|
|
|
if all_sales:
|
|
self._last_sale_pull = max(s["date"] for s in all_sales)
|
|
if all_debtors:
|
|
self._last_debtor_pull = max(d["updated_at"] for d in all_debtors)
|
|
if all_tickets:
|
|
self._last_ticket_pull = max(t["updated_at"] for t in all_tickets)
|
|
if all_dicom:
|
|
self._last_dicom_pull = max(d["updated_at"] for d in all_dicom)
|
|
if all_expenses:
|
|
self._last_expense_pull = max(e["date"] for e in all_expenses)
|
|
if all_deletions:
|
|
self._last_deletion_pull = max(d["deleted_at"] for d in all_deletions)
|
|
|
|
if pulled_prods or pulled_sales or pulled_debtors or pulled_dicom or pulled_tickets or pulled_expenses:
|
|
print(f"[Sync] Pulled {pulled_prods} products, {pulled_sales} sales, {pulled_debtors} debtors, {pulled_dicom} dicom, {pulled_tickets} tickets, {pulled_expenses} expenses")
|
|
else:
|
|
print(f"[Sync] Pull: nothing new to apply")
|
|
except Exception as e:
|
|
print(f"[Sync] Pull error: {e}")
|
|
finally:
|
|
conn.close()
|