luplo.core.sync.queue ===================== .. py:module:: luplo.core.sync.queue .. autoapi-nested-parse:: Debounce queue for external document sync jobs. When a page is updated multiple times in rapid succession (e.g. a writer editing a Notion doc), only a single sync job runs after a configurable debounce window. The ``sync_jobs`` table enforces at most one active (pending or processing) job per ``(source_type, source_page_id)`` via a partial unique index. Functions --------- .. autoapisummary:: luplo.core.sync.queue.enqueue_sync luplo.core.sync.queue.get_ready_sync_jobs luplo.core.sync.queue.complete_sync_job luplo.core.sync.queue.fail_sync_job Module Contents --------------- .. py:function:: enqueue_sync(conn: psycopg.AsyncConnection[Any], *, source_type: str, source_page_id: str, payload: str | None = None, source_event_id: str | None = None, debounce_seconds: int = 300) -> luplo.core.models.SyncJob :async: Add or merge a sync job into the debounce queue. If a pending/processing job already exists for the same page, its ``scheduled_at`` is bumped and ``payload`` is replaced (the latest content wins). Otherwise a new job is inserted. :param conn: Async psycopg connection. :param source_type: Origin system (e.g. ``"notion"``, ``"slack"``). :param source_page_id: External page/channel identifier. :param payload: Latest page content (markdown or raw text). :param source_event_id: External event ID for idempotency. :param debounce_seconds: Seconds to wait before processing (default 300). :returns: The created or updated ``SyncJob``. .. py:function:: get_ready_sync_jobs(conn: psycopg.AsyncConnection[Any], *, limit: int = 1) -> list[luplo.core.models.SyncJob] :async: Fetch and atomically claim sync jobs whose debounce window has passed. Claimed jobs are moved to ``processing`` status. Uses ``FOR UPDATE SKIP LOCKED`` so multiple workers can run safely. :param conn: Async psycopg connection. :param limit: Maximum jobs to claim (default 1). :returns: List of claimed ``SyncJob`` objects. .. py:function:: complete_sync_job(conn: psycopg.AsyncConnection[Any], job_id: int) -> None :async: Mark a sync job as successfully completed. .. py:function:: fail_sync_job(conn: psycopg.AsyncConnection[Any], job_id: int, *, error: str) -> None :async: Record a sync job failure. Increments ``attempts`` and sets ``last_error``. If the job has reached 3 attempts, its status is set to ``failed`` (permanently). Otherwise it goes back to ``pending`` for retry.