import threading import requests from datetime import datetime, timezone POLL_INTERVAL = 30 BACKOFF_MULTIPLIER = 2 MAX_BACKOFF = 120 class SyncManager: def __init__(self, db_file, instance_id, server_url, display_name="Desktop", sync_secret=""): self.db_file = db_file self.instance_id = instance_id self.server_url = server_url.rstrip('/') self.display_name = display_name self._headers = {'X-Sync-Secret': sync_secret} if sync_secret else {} self._stop = threading.Event() self._last_push_at = None self._last_sale_pull = None self._last_debtor_pull = None self._last_ticket_pull = None self._last_dicom_pull = None self._last_expense_pull = None self._last_deletion_pull = None def start(self): thread = threading.Thread(target=self._run, daemon=True, name="sync-manager") thread.start() def stop(self): self._stop.set() def _run(self): backoff = POLL_INTERVAL while not self._stop.wait(backoff): try: if self._ping(): self._push() self._pull() backoff = POLL_INTERVAL else: backoff = min(backoff * BACKOFF_MULTIPLIER, MAX_BACKOFF) except Exception as e: print(f"[Sync] Error: {e}") backoff = min(backoff * BACKOFF_MULTIPLIER, MAX_BACKOFF) def _ping(self): try: r = requests.get(f"{self.server_url}/api/ping", headers=self._headers, timeout=5) return r.ok except requests.RequestException: return False def _push(self): import sqlite3 conn = sqlite3.connect(self.db_file) try: now = datetime.now(timezone.utc).isoformat() cutoff = self._last_push_at or '1970-01-01' # Sales (immutable, synced_at IS NULL) sales = conn.execute( "SELECT id, uuid, date, total, payment_method FROM sales WHERE synced_at IS NULL" ).fetchall() sale_ids = [s[0] for s in sales] sale_payloads = [] for s in sales: items = conn.execute( "SELECT barcode, name, price, quantity, subtotal FROM sale_items WHERE sale_id = ?", (s[0],) ).fetchall() sale_payloads.append({ "uuid": s[1], "date": s[2], "total": s[3], "payment_method": s[4], "items": [{"barcode": i[0], "name": i[1], "price": i[2], "qty": i[3], "subtotal": i[4]} for i in items] }) # Products (mutable, by updated_at) products = conn.execute( "SELECT barcode, name, price, image_url, stock, unit_type, updated_at FROM products WHERE updated_by = ? AND updated_at >= ?", (self.instance_id, cutoff) ).fetchall() # Debtors (mutable) debtors = conn.execute( "SELECT uuid, name, contact_info, updated_at FROM debtors WHERE updated_by = ? AND updated_at >= ?", (self.instance_id, cutoff) ).fetchall() # Dicom legacy (mutable) dicom_rows = conn.execute( "SELECT uuid, name, amount, notes, image_url, updated_at FROM dicom WHERE updated_by = ? AND updated_at >= ?", (self.instance_id, cutoff) ).fetchall() # Debtor tickets with items (mutable) tickets_raw = conn.execute( "SELECT id, uuid, debtor_id, date, total, amount_paid, status, updated_at FROM debtor_tickets WHERE updated_by = ? AND updated_at >= ?", (self.instance_id, cutoff) ).fetchall() ticket_payloads = [] for t in tickets_raw: debtor_uuid = conn.execute("SELECT uuid FROM debtors WHERE id = ?", (t[2],)).fetchone() items = conn.execute( "SELECT barcode, name, price, quantity, subtotal FROM debtor_ticket_items WHERE ticket_id = ?", (t[0],) ).fetchall() ticket_payloads.append({ "uuid": t[1], "debtor_uuid": debtor_uuid[0] if debtor_uuid else '', "date": t[3], "total": t[4], "amount_paid": t[5], "status": t[6], "updated_at": t[7], "items": [{"barcode": i[0], "name": i[1], "price": i[2], "qty": i[3], "subtotal": i[4]} for i in items] }) # Expenses (immutable, synced_at IS NULL) expenses = conn.execute( "SELECT id, uuid, date, description, amount FROM expenses WHERE synced_at IS NULL" ).fetchall() expense_ids = [e[0] for e in expenses] # Deletions (immutable, synced_at IS NULL) deletions = conn.execute( "SELECT entity_type, entity_uuid FROM sync_deletions WHERE synced_at IS NULL" ).fetchall() payload = { "instance_id": self.instance_id, "display_name": self.display_name, "synced_at": now, "sales": sale_payloads, "products": [ {"barcode": p[0], "name": p[1], "price": p[2], "image_url": p[3], "stock": p[4], "unit_type": p[5], "updated_at": p[6]} for p in products ], "debtors": [ {"uuid": d[0], "name": d[1], "contact_info": d[2], "updated_at": d[3], "updated_by": self.instance_id} for d in debtors ], "dicom": [ {"uuid": d[0], "name": d[1], "amount": d[2], "notes": d[3], "image_url": d[4], "updated_at": d[5], "updated_by": self.instance_id} for d in dicom_rows ], "tickets": ticket_payloads, "expenses": [ {"uuid": e[1], "date": e[2], "description": e[3], "amount": e[4]} for e in expenses ], "deletions": [ {"entity_type": d[0], "entity_uuid": d[1]} for d in deletions ], } has_data = any([ sale_payloads, products, debtors, dicom_rows, ticket_payloads, expenses, deletions ]) if has_data: r = requests.post(f"{self.server_url}/api/sync/push", json=payload, headers=self._headers, timeout=15) if r.ok: if sale_ids: conn.executemany( "UPDATE sales SET synced_at = ? WHERE id = ?", [(now, sid) for sid in sale_ids] ) if expense_ids: conn.executemany( "UPDATE expenses SET synced_at = ? WHERE id = ?", [(now, eid) for eid in expense_ids] ) if deletions: conn.executemany( "UPDATE sync_deletions SET synced_at = ? WHERE entity_type = ? AND entity_uuid = ?", [(now, d[0], d[1]) for d in deletions] ) conn.commit() self._last_push_at = now print(f"[Sync] Pushed {len(sale_payloads)} sales, {len(products)} products, {len(debtors)} debtors, {len(dicom_rows)} dicom, {len(ticket_payloads)} tickets, {len(expenses)} expenses, {len(deletions)} deletions") else: print(f"[Sync] Push failed: HTTP {r.status_code} {r.text[:200]}") else: print(f"[Sync] Push: nothing to push") finally: conn.close() def _pull(self): import sqlite3 conn = sqlite3.connect(self.db_file) try: last_pull = conn.execute( "SELECT COALESCE(MAX(updated_at), '1970-01-01') FROM products WHERE updated_by != ?", (self.instance_id,) ).fetchone()[0] params = { "since": last_pull, "since_sales": self._last_sale_pull or '1970-01-01', "since_debtors": self._last_debtor_pull or '1970-01-01', "since_tickets": self._last_ticket_pull or '1970-01-01', "since_dicom": self._last_dicom_pull or '1970-01-01', "since_expenses": self._last_expense_pull or '1970-01-01', "since_deletions": self._last_deletion_pull or '1970-01-01', "instance_id": self.instance_id, } print(f"[Sync] Pull: {params}") r = requests.get(f"{self.server_url}/api/sync/pull", params=params, headers=self._headers, timeout=15) if not r.ok: print(f"[Sync] Pull request failed: HTTP {r.status_code} {r.text[:200]}") return data = r.json() now = datetime.now(timezone.utc).isoformat() # Apply deletions first for d in data.get("deletions", []): et, eu = d["entity_type"], d["entity_uuid"] if et == 'debtor': conn.execute("DELETE FROM debtors WHERE uuid = ?", (eu,)) elif et == 'ticket': row = conn.execute("SELECT id FROM debtor_tickets WHERE uuid = ?", (eu,)).fetchone() if row: conn.execute("DELETE FROM debtor_ticket_items WHERE ticket_id = ?", (row[0],)) conn.execute("DELETE FROM debtor_tickets WHERE uuid = ?", (eu,)) elif et == 'dicom': conn.execute("DELETE FROM dicom WHERE uuid = ?", (eu,)) elif et == 'expense': conn.execute("DELETE FROM expenses WHERE uuid = ?", (eu,)) elif et == 'sale': row = conn.execute("SELECT id FROM sales WHERE uuid = ?", (eu,)).fetchone() if row: conn.execute("DELETE FROM sale_items WHERE sale_id = ?", (row[0],)) conn.execute("DELETE FROM sales WHERE uuid = ?", (eu,)) elif et == 'product': conn.execute("DELETE FROM products WHERE barcode = ?", (eu,)) # Debtors (upsert by uuid) pulled_debtors = 0 for d in data.get("debtors", []): conn.execute('''INSERT INTO debtors (uuid, name, contact_info, updated_at, updated_by) VALUES (?,?,?,?,?) ON CONFLICT(uuid) DO UPDATE SET name=excluded.name, contact_info=excluded.contact_info, updated_at=excluded.updated_at, updated_by=excluded.updated_by''', (d["uuid"], d["name"], d.get("contact_info", ""), d.get("updated_at", now), d.get("updated_by", ""))) pulled_debtors += 1 # Dicom (upsert by uuid) pulled_dicom = 0 for d in data.get("dicom", []): conn.execute('''INSERT INTO dicom (uuid, name, amount, notes, image_url, updated_at, updated_by) VALUES (?,?,?,?,?,?,?) ON CONFLICT(uuid) DO UPDATE SET name=excluded.name, amount=excluded.amount, notes=excluded.notes, image_url=excluded.image_url, updated_at=excluded.updated_at, updated_by=excluded.updated_by''', (d["uuid"], d["name"], d.get("amount", 0), d.get("notes", ""), d.get("image_url", ""), d.get("updated_at", now), d.get("updated_by", ""))) pulled_dicom += 1 # Debtor tickets (insert with items, dedup by uuid) pulled_tickets = 0 for t in data.get("tickets", []): existing = conn.execute( "SELECT id FROM debtor_tickets WHERE uuid = ?", (t["uuid"],) ).fetchone() if existing: continue debtor_row = conn.execute( "SELECT id FROM debtors WHERE uuid = ?", (t["debtor_uuid"],) ).fetchone() if not debtor_row: continue cur = conn.execute( "INSERT INTO debtor_tickets (uuid, debtor_id, date, total, amount_paid, status, updated_at, updated_by) VALUES (?,?,?,?,?,?,?,?)", (t["uuid"], debtor_row[0], t.get("date", now), t["total"], t.get("amount_paid", 0), t.get("status", "unpaid"), t.get("updated_at", now), t.get("updated_by", "")) ) for item in t.get("items", []): conn.execute( "INSERT INTO debtor_ticket_items (ticket_id, barcode, name, price, quantity, subtotal) VALUES (?,?,?,?,?,?)", (cur.lastrowid, item["barcode"], item["name"], item.get("price", 0), item.get("qty", 1), item.get("subtotal", 0)) ) pulled_tickets += 1 # Expenses (insert, dedup by uuid) pulled_expenses = 0 for e in data.get("expenses", []): existing = conn.execute( "SELECT id FROM expenses WHERE uuid = ?", (e["uuid"],) ).fetchone() if existing: continue conn.execute( "INSERT INTO expenses (uuid, date, description, amount, synced_at) VALUES (?,?,?,?,?)", (e["uuid"], e.get("date", now), e["description"], e["amount"], now) ) pulled_expenses += 1 # Products (upsert by barcode, skip if local is newer) pulled_prods = 0 for p in data.get("products", []): existing = conn.execute( "SELECT updated_at FROM products WHERE barcode = ?", (p["barcode"],) ).fetchone() if existing and existing[0] >= p["updated_at"]: continue conn.execute('''INSERT INTO products (barcode, name, price, image_url, stock, unit_type, updated_at, updated_by) VALUES (?,?,?,?,?,?,?,?) ON CONFLICT(barcode) DO UPDATE SET name=excluded.name, price=excluded.price, image_url=excluded.image_url, stock=excluded.stock, unit_type=excluded.unit_type, updated_at=excluded.updated_at, updated_by=excluded.updated_by''', (p["barcode"], p["name"], p["price"], p.get("image_url"), p.get("stock", 0), p.get("unit_type", "unit"), p["updated_at"], p.get("updated_by", ""))) pulled_prods += 1 # Sales (insert with items, dedup by uuid) pulled_sales = 0 for sale in data.get("sales", []): existing = conn.execute( "SELECT id FROM sales WHERE uuid = ?", (sale["uuid"],) ).fetchone() if existing: continue cur = conn.execute( "INSERT INTO sales (date, total, payment_method, uuid, synced_at) VALUES (?, ?, ?, ?, ?)", (sale.get("date", now), sale["total"], sale.get("payment_method", "efectivo"), sale["uuid"], now) ) for item in sale.get("items", []): conn.execute( "INSERT INTO sale_items (sale_id, barcode, name, price, quantity, subtotal) VALUES (?, ?, ?, ?, ?, ?)", (cur.lastrowid, item["barcode"], item["name"], item.get("price", 0), item.get("qty", 1), item.get("subtotal", 0)) ) pulled_sales += 1 conn.commit() # Update pull timestamps all_sales = data.get("sales", []) all_debtors = data.get("debtors", []) all_tickets = data.get("tickets", []) all_dicom = data.get("dicom", []) all_expenses = data.get("expenses", []) all_deletions = data.get("deletions", []) if all_sales: self._last_sale_pull = max(s["date"] for s in all_sales) if all_debtors: self._last_debtor_pull = max(d["updated_at"] for d in all_debtors) if all_tickets: self._last_ticket_pull = max(t["updated_at"] for t in all_tickets) if all_dicom: self._last_dicom_pull = max(d["updated_at"] for d in all_dicom) if all_expenses: self._last_expense_pull = max(e["date"] for e in all_expenses) if all_deletions: self._last_deletion_pull = max(d["deleted_at"] for d in all_deletions) if pulled_prods or pulled_sales or pulled_debtors or pulled_dicom or pulled_tickets or pulled_expenses: print(f"[Sync] Pulled {pulled_prods} products, {pulled_sales} sales, {pulled_debtors} debtors, {pulled_dicom} dicom, {pulled_tickets} tickets, {pulled_expenses} expenses") else: print(f"[Sync] Pull: nothing new to apply") except Exception as e: print(f"[Sync] Pull error: {e}") finally: conn.close()