luplo.core.backend

Backend abstraction — Local (direct PG) or Remote (HTTP).

Submodules

Classes

LocalBackend

Backend that talks directly to PostgreSQL via a connection pool.

Backend

Base class for protocol classes.

Package Contents

class luplo.core.backend.LocalBackend(pool: psycopg_pool.AsyncConnectionPool, embedding: luplo.core.embedding.EmbeddingBackend | None = None)

Backend that talks directly to PostgreSQL via a connection pool.

This is the primary backend for lp init --local (single-user mode) and for the FastAPI server’s internal use.

Parameters:
  • pool – An open AsyncConnectionPool.

  • embedding – Embedding backend for search reranking. Defaults to NullEmbedding (no vectors).

pool
async create_project(*, id: str, name: str, description: str | None = None) luplo.core.models.Project
async get_project(id: str) luplo.core.models.Project | None
async list_projects() list[luplo.core.models.Project]
async create_actor(*, id: str, name: str, email: str | None = None, role: str | None = None, external_ids: dict[str, str] | None = None) luplo.core.models.Actor
async get_actor(id: str) luplo.core.models.Actor | None
async list_item_types() list[luplo.core.models.ItemType]
async get_item_type(key: str) luplo.core.models.ItemType | None
async create_item_type(*, key: str, display_name: str, schema: dict[str, Any], owner: str = 'user') luplo.core.models.ItemType
async get_actor_by_email(email: str) luplo.core.models.Actor | None
async open_work_unit(*, id: str, project_id: str, title: str, description: str | None = None, system_ids: list[str] | None = None, created_by: str | None = None, context: dict[str, Any] | None = None) luplo.core.models.WorkUnit
async get_work_unit(id: str, *, project_id: str | None = None) luplo.core.models.WorkUnit | None
async list_work_units(project_id: str, *, status: str | None = None) list[luplo.core.models.WorkUnit]
async close_work_unit(id: str, *, actor_id: str, force: bool = False) luplo.core.models.WorkUnit | None
async archive_work_unit(*, id: str, archived_by: str, replaced_by_wu_id: str) luplo.core.models.WorkUnit
async find_existing_import_wu(*, project_id: str, content_hash_set: tuple[str, Ellipsis]) luplo.core.models.WorkUnit | None
async create_system(*, id: str, project_id: str, name: str, description: str | None = None, depends_on_system_ids: list[str] | None = None) luplo.core.models.System
async get_system(id: str, *, project_id: str | None = None) luplo.core.models.System | None
async list_systems(project_id: str) list[luplo.core.models.System]
async update_system(id: str, **kwargs: Any) luplo.core.models.System | None
async create_item(data: luplo.core.models.ItemCreate) luplo.core.models.Item
async get_item(id: str, *, project_id: str | None = None) luplo.core.models.Item | None
async list_items(project_id: str, *, item_type: str | None = None, system_id: str | None = None, work_unit_id: str | None = None, include_deleted: bool = False, limit: int = 100, offset: int = 0) list[luplo.core.models.Item]
async delete_item(id: str, *, actor_id: str) None
async get_supersedes_chain(id: str) list[luplo.core.models.Item]
async impact(item_id: str, project_id: str, *, depth: int = 5) luplo.core.impact.ImpactResult
async run_checks(project_id: str, *, rule_names: list[str] | None = None, disabled: tuple[str, Ellipsis] = ()) list[luplo.core.checks.Finding]
async search(query: str, project_id: str, *, item_types: list[str] | None = None, system_ids: list[str] | None = None, limit: int = 10, tsquery: str | None = None) list[luplo.core.models.SearchResult]
async add_capture(*, text: str, created_by: str | None = None, summary: str | None = None, sensitivity_hint: str = 'none', signals: dict[str, Any] | None = None) luplo.core.models.Capture
async list_captures(*, review_state: str | None = None, include_discarded: bool = False, include_redacted: bool = False, since: datetime.datetime | None = None, until: datetime.datetime | None = None, limit: int = 100) list[luplo.core.models.Capture]
async get_capture(capture_id: str) luplo.core.models.Capture | None
async search_captures(*, query: str | None = None, review_state: str | None = None, include_discarded: bool = False, include_redacted: bool = False, since: datetime.datetime | None = None, until: datetime.datetime | None = None, limit: int = 50) list[luplo.core.models.Capture]
async set_capture_state(capture_id: str, *, review_state: str, actor_id: str | None = None) luplo.core.models.Capture
async discard_capture(capture_id: str, *, actor_id: str | None = None) luplo.core.models.Capture
async redact_capture(capture_id: str, *, redacted_by: str | None = None) luplo.core.models.Capture
async annotate_capture(capture_id: str, *, summary: str | None = None, sensitivity_hint: str | None = None, signals: dict[str, Any] | None = None) luplo.core.models.Capture
async promote_capture_to_item(capture_id: str, data: luplo.core.models.ItemCreate) tuple[luplo.core.models.Capture, luplo.core.models.Item]
async create_glossary_group(*, id: str, project_id: str, canonical: str, definition: str | None = None, scope: str = 'project', scope_id: str | None = None, created_by: str | None = None) luplo.core.models.GlossaryGroup
async get_glossary_group(id: str, *, project_id: str | None = None) luplo.core.models.GlossaryGroup | None
async list_glossary_groups(project_id: str, *, needs_review: bool = False, limit: int = 100, offset: int = 0) list[luplo.core.models.GlossaryGroup]
async create_glossary_term(*, id: str, group_id: str | None, surface: str, normalized: str, is_protected: bool = False, status: str = 'pending', source_item_id: str | None = None, context_snippet: str | None = None) luplo.core.models.GlossaryTerm
async list_pending_terms(project_id: str, *, limit: int = 50) list[luplo.core.models.GlossaryTerm]
async create_glossary_group_with_canonical(*, project_id: str, canonical: str, definition: str | None = None, actor_id: str | None = None) tuple[luplo.core.models.GlossaryGroup, luplo.core.models.GlossaryTerm]
async add_term_to_group(group_id: str, *, surface: str, actor_id: str, as_canonical: bool = False) luplo.core.models.GlossaryTerm
async delete_glossary_term(term_id: str, *, actor_id: str) bool
async approve_term(term_id: str, *, group_id: str, actor_id: str, as_canonical: bool = False) luplo.core.models.GlossaryTerm | None
async reject_term(term_id: str, *, actor_id: str, reason: str | None = None) luplo.core.models.GlossaryRejection | None
async merge_groups(source_group_id: str, target_group_id: str, *, actor_id: str) luplo.core.models.GlossaryGroup | None
async split_term(term_id: str, *, new_canonical: str, actor_id: str) luplo.core.models.GlossaryGroup | None
async expand_query(query: str, project_id: str) str
async record_history(*, item_id: str, version: int, changed_by: str, content_before: str | None = None, content_after: str | None = None, content_hash_before: str | None = None, content_hash_after: str | None = None, diff_summary: str | None = None, semantic_impact: str | None = None, source_event_id: str | None = None) luplo.core.models.HistoryEntry
async query_history(*, project_id: str | None = None, item_id: str | None = None, since: datetime.datetime | None = None, semantic_impacts: list[str] | None = None, limit: int = 50) list[luplo.core.models.HistoryEntry]
async record_audit(*, actor_id: str, action: str, target_type: str | None = None, target_id: str | None = None, metadata: dict[str, Any] | None = None) None
async create_task(*, project_id: str, work_unit_id: str, title: str, actor_id: str, sort_order: int | None = None, systems: list[str] | None = None, body: str | None = None, context_extra: dict[str, Any] | None = None) luplo.core.models.Item
async get_task(task_id: str, *, project_id: str | None = None) luplo.core.models.Item | None
async list_tasks(work_unit_id: str, *, status: str | None = None) list[luplo.core.models.Item]
async get_in_progress_task(work_unit_id: str) luplo.core.models.Item | None
async start_task(task_id: str, *, actor_id: str, project_id: str | None = None) luplo.core.models.Item
async complete_task(task_id: str, *, actor_id: str, summary: str | None = None, project_id: str | None = None) luplo.core.models.Item
async block_task(task_id: str, *, actor_id: str, reason: str, project_id: str | None = None) luplo.core.models.Item
async skip_task(task_id: str, *, actor_id: str, reason: str | None = None, project_id: str | None = None) luplo.core.models.Item
async reorder_tasks(work_unit_id: str, task_ids: list[str], *, actor_id: str, project_id: str | None = None) list[luplo.core.models.Item]
async suggest_decision_from_task(task_id: str, *, project_id: str | None = None) luplo.core.models.ItemCreate | None
async edit_task(task_id: str, *, actor_id: str, title: str | None = None, body: str | None = None, sort_order: int | None = None, project_id: str | None = None) luplo.core.models.Item
async add_idea(*, project_id: str, work_unit_id: str, text: str, created_by: str | None = None) luplo.core.models.Idea
async list_ideas(*, work_unit_id: str, project_id: str | None = None, limit: int = 100, include_redacted: bool = False) list[luplo.core.models.Idea]
async search_ideas(*, project_id: str, query: str | None = None, tsquery: str | None = None, work_unit_id: str | None = None, author: str | None = None, since: datetime.datetime | None = None, until: datetime.datetime | None = None, include_redacted: bool = False, limit: int = 50) list[luplo.core.models.Idea]
async get_idea(idea_id: str, *, project_id: str | None = None) luplo.core.models.Idea | None
async redact_idea(*, idea_id: str, redacted_by: str, project_id: str | None = None) tuple[luplo.core.models.Idea, bool]
async create_qa(*, project_id: str, title: str, actor_id: str, coverage: str, areas: list[str] | None = None, target_item_ids: list[str] | None = None, target_task_ids: list[str] | None = None, work_unit_id: str | None = None, body: str | None = None, context_extra: dict[str, Any] | None = None) luplo.core.models.Item
async get_qa(qa_id: str, *, project_id: str | None = None) luplo.core.models.Item | None
async list_qa(project_id: str, *, status: str | None = None, work_unit_id: str | None = None) list[luplo.core.models.Item]
async list_pending_qa_for_task(task_id: str) list[luplo.core.models.Item]
async list_pending_qa_for_item(item_id: str) list[luplo.core.models.Item]
async list_pending_qa_for_wu(work_unit_id: str) list[luplo.core.models.Item]
async start_qa(qa_id: str, *, actor_id: str, project_id: str | None = None) luplo.core.models.Item
async pass_qa(qa_id: str, *, actor_id: str, evidence: str | None = None, project_id: str | None = None) luplo.core.models.Item
async fail_qa(qa_id: str, *, actor_id: str, reason: str, project_id: str | None = None) luplo.core.models.Item
async block_qa(qa_id: str, *, actor_id: str, reason: str, project_id: str | None = None) luplo.core.models.Item
async skip_qa(qa_id: str, *, actor_id: str, project_id: str | None = None) luplo.core.models.Item
async assign_qa(qa_id: str, *, actor_id: str, assignee_actor_id: str, project_id: str | None = None) luplo.core.models.Item
async enqueue_sync(*, source_type: str, source_page_id: str, payload: str | None = None, source_event_id: str | None = None, debounce_seconds: int = 300) luplo.core.models.SyncJob
async get_ready_sync_jobs(*, limit: int = 1) list[luplo.core.models.SyncJob]
async complete_sync_job(job_id: int) None
async fail_sync_job(job_id: int, *, error: str) None
class luplo.core.backend.Backend

Bases: Protocol

Base class for protocol classes.

Protocol classes are defined as:

class Proto(Protocol):
    def meth(self) -> int:
        ...

Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).

For example:

class C:
    def meth(self) -> int:
        return 0

def func(x: Proto) -> int:
    return x.meth()

func(C())  # Passes static type check

See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as:

class GenProto[T](Protocol):
    def meth(self) -> T:
        ...
async create_project(*, id: str, name: str, description: str | None = None) luplo.core.models.Project
async get_project(id: str) luplo.core.models.Project | None
async list_projects() list[luplo.core.models.Project]
async create_actor(*, id: str, name: str, email: str | None = None, role: str | None = None, external_ids: dict[str, str] | None = None) luplo.core.models.Actor
async get_actor(id: str) luplo.core.models.Actor | None
async get_actor_by_email(email: str) luplo.core.models.Actor | None
async list_item_types() list[luplo.core.models.ItemType]
async get_item_type(key: str) luplo.core.models.ItemType | None
async create_item_type(*, key: str, display_name: str, schema: dict[str, Any], owner: str = 'user') luplo.core.models.ItemType
async create_task(*, project_id: str, work_unit_id: str, title: str, actor_id: str, sort_order: int | None = None, systems: list[str] | None = None, body: str | None = None, context_extra: dict[str, Any] | None = None) luplo.core.models.Item
async get_task(task_id: str, *, project_id: str | None = None) luplo.core.models.Item | None

Fetch a task head by full UUID or hex prefix (≥8 chars).

project_id (when provided) scopes prefix lookups to a single project so prefixes from other projects do not collide.

async list_tasks(work_unit_id: str, *, status: str | None = None) list[luplo.core.models.Item]
async get_in_progress_task(work_unit_id: str) luplo.core.models.Item | None
async start_task(task_id: str, *, actor_id: str, project_id: str | None = None) luplo.core.models.Item
async complete_task(task_id: str, *, actor_id: str, summary: str | None = None, project_id: str | None = None) luplo.core.models.Item
async block_task(task_id: str, *, actor_id: str, reason: str, project_id: str | None = None) luplo.core.models.Item
async skip_task(task_id: str, *, actor_id: str, reason: str | None = None, project_id: str | None = None) luplo.core.models.Item
async reorder_tasks(work_unit_id: str, task_ids: list[str], *, actor_id: str, project_id: str | None = None) list[luplo.core.models.Item]
async edit_task(task_id: str, *, actor_id: str, title: str | None = None, body: str | None = None, sort_order: int | None = None, project_id: str | None = None) luplo.core.models.Item
async suggest_decision_from_task(task_id: str, *, project_id: str | None = None) luplo.core.models.ItemCreate | None

Build (never insert) a decision-item draft derived from a task.

Returns None when the task lacks body/summary content.

async add_capture(*, text: str, created_by: str | None = None, summary: str | None = None, sensitivity_hint: str = 'none', signals: dict[str, Any] | None = None) luplo.core.models.Capture
async list_captures(*, review_state: str | None = None, include_discarded: bool = False, include_redacted: bool = False, since: datetime.datetime | None = None, until: datetime.datetime | None = None, limit: int = 100) list[luplo.core.models.Capture]
async get_capture(capture_id: str) luplo.core.models.Capture | None
async search_captures(*, query: str | None = None, review_state: str | None = None, include_discarded: bool = False, include_redacted: bool = False, since: datetime.datetime | None = None, until: datetime.datetime | None = None, limit: int = 50) list[luplo.core.models.Capture]
async set_capture_state(capture_id: str, *, review_state: str, actor_id: str | None = None) luplo.core.models.Capture
async discard_capture(capture_id: str, *, actor_id: str | None = None) luplo.core.models.Capture
async redact_capture(capture_id: str, *, redacted_by: str | None = None) luplo.core.models.Capture
async annotate_capture(capture_id: str, *, summary: str | None = None, sensitivity_hint: str | None = None, signals: dict[str, Any] | None = None) luplo.core.models.Capture
async promote_capture_to_item(capture_id: str, data: luplo.core.models.ItemCreate) tuple[luplo.core.models.Capture, luplo.core.models.Item]
async create_qa(*, project_id: str, title: str, actor_id: str, coverage: str, areas: list[str] | None = None, target_item_ids: list[str] | None = None, target_task_ids: list[str] | None = None, work_unit_id: str | None = None, body: str | None = None, context_extra: dict[str, Any] | None = None) luplo.core.models.Item
async get_qa(qa_id: str, *, project_id: str | None = None) luplo.core.models.Item | None

Fetch a qa_check head by full UUID or hex prefix (≥8 chars).

project_id (when provided) scopes prefix lookups.

async list_qa(project_id: str, *, status: str | None = None, work_unit_id: str | None = None) list[luplo.core.models.Item]
async list_pending_qa_for_task(task_id: str) list[luplo.core.models.Item]
async list_pending_qa_for_item(item_id: str) list[luplo.core.models.Item]
async list_pending_qa_for_wu(work_unit_id: str) list[luplo.core.models.Item]
async start_qa(qa_id: str, *, actor_id: str, project_id: str | None = None) luplo.core.models.Item
async pass_qa(qa_id: str, *, actor_id: str, evidence: str | None = None, project_id: str | None = None) luplo.core.models.Item
async fail_qa(qa_id: str, *, actor_id: str, reason: str, project_id: str | None = None) luplo.core.models.Item
async block_qa(qa_id: str, *, actor_id: str, reason: str, project_id: str | None = None) luplo.core.models.Item
async skip_qa(qa_id: str, *, actor_id: str, project_id: str | None = None) luplo.core.models.Item
async assign_qa(qa_id: str, *, actor_id: str, assignee_actor_id: str, project_id: str | None = None) luplo.core.models.Item
async add_idea(*, project_id: str, work_unit_id: str, text: str, created_by: str | None = None) luplo.core.models.Idea
async list_ideas(*, work_unit_id: str, project_id: str | None = None, limit: int = 100, include_redacted: bool = False) list[luplo.core.models.Idea]
async search_ideas(*, project_id: str, query: str | None = None, tsquery: str | None = None, work_unit_id: str | None = None, author: str | None = None, since: datetime.datetime | None = None, until: datetime.datetime | None = None, include_redacted: bool = False, limit: int = 50) list[luplo.core.models.Idea]
async get_idea(idea_id: str, *, project_id: str | None = None) luplo.core.models.Idea | None
async redact_idea(*, idea_id: str, redacted_by: str, project_id: str | None = None) tuple[luplo.core.models.Idea, bool]

Redact an idea; return (idea, newly_redacted).

newly_redacted=False indicates an idempotent retry — the row was already redacted, so callers should skip side-effects.

async open_work_unit(*, id: str, project_id: str, title: str, description: str | None = None, system_ids: list[str] | None = None, created_by: str | None = None, context: dict[str, Any] | None = None) luplo.core.models.WorkUnit
async get_work_unit(id: str, *, project_id: str | None = None) luplo.core.models.WorkUnit | None

Fetch a work unit by full UUID or hex prefix (≥8 chars).

async list_work_units(project_id: str, *, status: str | None = None) list[luplo.core.models.WorkUnit]
async close_work_unit(id: str, *, actor_id: str, force: bool = False) luplo.core.models.WorkUnit | None

Sets status=’done’, closed_at=now(), closed_by=actor_id.

Returns None when the work unit does not exist. Refuses the close when an in_progress task remains and force is False (raises WorkUnitHasActiveTasksError).

async archive_work_unit(*, id: str, archived_by: str, replaced_by_wu_id: str) luplo.core.models.WorkUnit

Mark a work unit as superseded by a force-import.

Sets status to ‘archived’, records the replacement wu_id in context, and stamps closed_at/closed_by. Distinct from close_work_unit (status=’done’) and from abandoned (user gave up).

async find_existing_import_wu(*, project_id: str, content_hash_set: tuple[str, Ellipsis]) luplo.core.models.WorkUnit | None

Find a non-archived import work unit whose content matches.

Used by lp import begin for dedup: returns the most recently created work unit in the project whose context.kind == 'import' and whose context.content_hash_set (sorted) equals the given hashes. Archived and abandoned work units are excluded.

The dedup key is content-hash-based (not path-based) so the same bundle imported under different paths or from different working directories collapses to one work_unit. Both LocalBackend and the cloud (RemoteBackend → SaaS) match on this invariant.

Returns None when no match exists.

async create_system(*, id: str, project_id: str, name: str, description: str | None = None, depends_on_system_ids: list[str] | None = None) luplo.core.models.System
async get_system(id: str, *, project_id: str | None = None) luplo.core.models.System | None

Fetch a system by full UUID or hex prefix (≥8 chars).

async list_systems(project_id: str) list[luplo.core.models.System]
async update_system(id: str, *, description: str | None = ..., depends_on_system_ids: list[str] | None = ..., status: str | None = ...) luplo.core.models.System | None

Only updates fields that are explicitly passed (not sentinel …).

Returns None when the system does not exist.

async create_item(data: luplo.core.models.ItemCreate) luplo.core.models.Item

Insert new item. If data.supersedes_id is set, this is an edit (new row superseding the old one). ID is auto-generated.

async get_item(id: str, *, project_id: str | None = None) luplo.core.models.Item | None

Fetch an item by full UUID or hex prefix (≥8 chars).

Returns None if not found or soft-deleted. project_id (when provided) scopes prefix lookups.

async list_items(project_id: str, *, item_type: str | None = None, system_id: str | None = None, work_unit_id: str | None = None, include_deleted: bool = False, limit: int = 100, offset: int = 0) list[luplo.core.models.Item]
async delete_item(id: str, *, actor_id: str) None

Soft delete — sets deleted_at, never removes the row.

async get_supersedes_chain(id: str) list[luplo.core.models.Item]

Walk supersedes_id backward. Returns oldest-first.

async impact(item_id: str, project_id: str, *, depth: int = 5) luplo.core.impact.ImpactResult

Traverse outgoing typed edges to find the item’s blast radius.

async run_checks(project_id: str, *, rule_names: list[str] | None = None) list[luplo.core.checks.Finding]

Run the enabled rule pack against project_id and return findings.

async search(query: str, project_id: str, *, item_types: list[str] | None = None, system_ids: list[str] | None = None, limit: int = 10, tsquery: str | None = None) list[luplo.core.models.SearchResult]

Full pipeline: glossary expand → tsquery → vector rerank.

When tsquery is set, the simple-dialect parser and glossary expansion are bypassed; the string is passed straight to PostgreSQL to_tsquery. Caller owns synonym coverage and syntax validity.

async create_glossary_group(*, id: str, project_id: str, canonical: str, definition: str | None = None, scope: str = 'project', scope_id: str | None = None, created_by: str | None = None) luplo.core.models.GlossaryGroup
async get_glossary_group(id: str, *, project_id: str | None = None) luplo.core.models.GlossaryGroup | None

Fetch a glossary group by full UUID or hex prefix (≥8 chars).

async list_glossary_groups(project_id: str, *, needs_review: bool = False, limit: int = 100, offset: int = 0) list[luplo.core.models.GlossaryGroup]
async create_glossary_term(*, id: str, group_id: str | None, surface: str, normalized: str, is_protected: bool = False, status: str = 'pending', source_item_id: str | None = None, context_snippet: str | None = None) luplo.core.models.GlossaryTerm
async list_pending_terms(project_id: str, *, limit: int = 50) list[luplo.core.models.GlossaryTerm]
async create_glossary_group_with_canonical(*, project_id: str, canonical: str, definition: str | None = None, actor_id: str | None = None) tuple[luplo.core.models.GlossaryGroup, luplo.core.models.GlossaryTerm]

Create a group plus its canonical surface term in one step.

async add_term_to_group(group_id: str, *, surface: str, actor_id: str, as_canonical: bool = False) luplo.core.models.GlossaryTerm

Add a new term to a group; default status is alias.

With as_canonical=True the existing canonical (if any) is demoted to alias before this term is inserted as canonical.

async delete_glossary_term(term_id: str, *, actor_id: str) bool

Permanently delete a term; cascade-delete the group when empty.

async approve_term(term_id: str, *, group_id: str, actor_id: str, as_canonical: bool = False) luplo.core.models.GlossaryTerm | None

Set status to ‘canonical’ (if as_canonical) or ‘alias’.

Returns None when the term does not exist.

async reject_term(term_id: str, *, actor_id: str, reason: str | None = None) luplo.core.models.GlossaryRejection | None

Sets term status=’rejected’ and inserts glossary_rejections row. System will never re-propose this match.

Returns None when the term does not exist.

async merge_groups(source_group_id: str, target_group_id: str, *, actor_id: str) luplo.core.models.GlossaryGroup | None

Move all terms from source into target, delete source group.

Returns None when either group does not exist.

async split_term(term_id: str, *, new_canonical: str, actor_id: str) luplo.core.models.GlossaryGroup | None

Remove term from its group, create a new group with it as canonical.

Returns None when the term does not exist.

async expand_query(query: str, project_id: str) str

Glossary expansion: ‘auth budget’ → ‘(auth|signin) & (budget|goldpool)’.

async record_history(*, item_id: str, version: int, changed_by: str, content_before: str | None = None, content_after: str | None = None, content_hash_before: str | None = None, content_hash_after: str | None = None, diff_summary: str | None = None, semantic_impact: str | None = None, source_event_id: str | None = None) luplo.core.models.HistoryEntry
async query_history(*, project_id: str | None = None, item_id: str | None = None, since: datetime.datetime | None = None, semantic_impacts: list[str] | None = None, limit: int = 50) list[luplo.core.models.HistoryEntry]
async record_audit(*, actor_id: str, action: str, target_type: str | None = None, target_id: str | None = None, metadata: dict[str, Any] | None = None) None
async enqueue_sync(*, source_type: str, source_page_id: str, payload: str | None = None, source_event_id: str | None = None, debounce_seconds: int = 300) luplo.core.models.SyncJob

Debounce queue: if a pending job exists for the same page, merge into it (bump scheduled_at, replace payload).

async get_ready_sync_jobs(*, limit: int = 1) list[luplo.core.models.SyncJob]

Fetch jobs where scheduled_at <= now() and status=’pending’.

async complete_sync_job(job_id: int) None

Mark status=’completed’.

async fail_sync_job(job_id: int, *, error: str) None

Increment attempts, set last_error. If attempts >= 3, status=’failed’.