luplo.core.sync.queue

Debounce queue for external document sync jobs.

When a page is updated multiple times in rapid succession (e.g. a writer editing a Notion doc), only a single sync job runs after a configurable debounce window. The sync_jobs table enforces at most one active (pending or processing) job per (source_type, source_page_id) via a partial unique index.

Functions

enqueue_sync(→ luplo.core.models.SyncJob)

Add or merge a sync job into the debounce queue.

get_ready_sync_jobs(→ list[luplo.core.models.SyncJob])

Fetch and atomically claim sync jobs whose debounce window has passed.

complete_sync_job(→ None)

Mark a sync job as successfully completed.

fail_sync_job(→ None)

Record a sync job failure.

Module Contents

async luplo.core.sync.queue.enqueue_sync(conn: psycopg.AsyncConnection[Any], *, source_type: str, source_page_id: str, payload: str | None = None, source_event_id: str | None = None, debounce_seconds: int = 300) luplo.core.models.SyncJob

Add or merge a sync job into the debounce queue.

If a pending/processing job already exists for the same page, its scheduled_at is bumped and payload is replaced (the latest content wins). Otherwise a new job is inserted.

Parameters:
  • conn – Async psycopg connection.

  • source_type – Origin system (e.g. "notion", "slack").

  • source_page_id – External page/channel identifier.

  • payload – Latest page content (markdown or raw text).

  • source_event_id – External event ID for idempotency.

  • debounce_seconds – Seconds to wait before processing (default 300).

Returns:

The created or updated SyncJob.

async luplo.core.sync.queue.get_ready_sync_jobs(conn: psycopg.AsyncConnection[Any], *, limit: int = 1) list[luplo.core.models.SyncJob]

Fetch and atomically claim sync jobs whose debounce window has passed.

Claimed jobs are moved to processing status. Uses FOR UPDATE SKIP LOCKED so multiple workers can run safely.

Parameters:
  • conn – Async psycopg connection.

  • limit – Maximum jobs to claim (default 1).

Returns:

List of claimed SyncJob objects.

async luplo.core.sync.queue.complete_sync_job(conn: psycopg.AsyncConnection[Any], job_id: int) None

Mark a sync job as successfully completed.

async luplo.core.sync.queue.fail_sync_job(conn: psycopg.AsyncConnection[Any], job_id: int, *, error: str) None

Record a sync job failure.

Increments attempts and sets last_error. If the job has reached 3 attempts, its status is set to failed (permanently). Otherwise it goes back to pending for retry.