luplo.core.sync.queue¶
Debounce queue for external document sync jobs.
When a page is updated multiple times in rapid succession (e.g. a writer
editing a Notion doc), only a single sync job runs after a configurable
debounce window. The sync_jobs table enforces at most one active
(pending or processing) job per (source_type, source_page_id) via a
partial unique index.
Functions¶
|
Add or merge a sync job into the debounce queue. |
|
Fetch and atomically claim sync jobs whose debounce window has passed. |
|
Mark a sync job as successfully completed. |
|
Record a sync job failure. |
Module Contents¶
- async luplo.core.sync.queue.enqueue_sync(conn: psycopg.AsyncConnection[Any], *, source_type: str, source_page_id: str, payload: str | None = None, source_event_id: str | None = None, debounce_seconds: int = 300) luplo.core.models.SyncJob¶
Add or merge a sync job into the debounce queue.
If a pending/processing job already exists for the same page, its
scheduled_atis bumped andpayloadis replaced (the latest content wins). Otherwise a new job is inserted.- Parameters:
conn – Async psycopg connection.
source_type – Origin system (e.g.
"notion","slack").source_page_id – External page/channel identifier.
payload – Latest page content (markdown or raw text).
source_event_id – External event ID for idempotency.
debounce_seconds – Seconds to wait before processing (default 300).
- Returns:
The created or updated
SyncJob.
- async luplo.core.sync.queue.get_ready_sync_jobs(conn: psycopg.AsyncConnection[Any], *, limit: int = 1) list[luplo.core.models.SyncJob]¶
Fetch and atomically claim sync jobs whose debounce window has passed.
Claimed jobs are moved to
processingstatus. UsesFOR UPDATE SKIP LOCKEDso multiple workers can run safely.- Parameters:
conn – Async psycopg connection.
limit – Maximum jobs to claim (default 1).
- Returns:
List of claimed
SyncJobobjects.
- async luplo.core.sync.queue.complete_sync_job(conn: psycopg.AsyncConnection[Any], job_id: int) None¶
Mark a sync job as successfully completed.
- async luplo.core.sync.queue.fail_sync_job(conn: psycopg.AsyncConnection[Any], job_id: int, *, error: str) None¶
Record a sync job failure.
Increments
attemptsand setslast_error. If the job has reached 3 attempts, its status is set tofailed(permanently). Otherwise it goes back topendingfor retry.