luplo.core.backend.local ======================== .. py:module:: luplo.core.backend.local .. autoapi-nested-parse:: LocalBackend — implements the Backend Protocol via direct PG access. All core module functions take a raw ``AsyncConnection``. This class acquires a connection from the pool for each operation, adds cross-cutting concerns (audit logging, history recording on supersede), and delegates to the corresponding core function. Classes ------- .. autoapisummary:: luplo.core.backend.local.LocalBackend Module Contents --------------- .. py:class:: LocalBackend(pool: psycopg_pool.AsyncConnectionPool, embedding: luplo.core.embedding.EmbeddingBackend | None = None) Backend that talks directly to PostgreSQL via a connection pool. This is the primary backend for ``lp init --local`` (single-user mode) and for the FastAPI server's internal use. :param pool: An open ``AsyncConnectionPool``. :param embedding: Embedding backend for search reranking. Defaults to ``NullEmbedding`` (no vectors). .. py:attribute:: pool .. py:method:: create_project(*, id: str, name: str, description: str | None = None) -> luplo.core.models.Project :async: .. py:method:: get_project(id: str) -> luplo.core.models.Project | None :async: .. py:method:: list_projects() -> list[luplo.core.models.Project] :async: .. py:method:: create_actor(*, id: str, name: str, email: str | None = None, role: str | None = None, external_ids: dict[str, str] | None = None) -> luplo.core.models.Actor :async: .. py:method:: get_actor(id: str) -> luplo.core.models.Actor | None :async: .. py:method:: list_item_types() -> list[luplo.core.models.ItemType] :async: .. py:method:: get_item_type(key: str) -> luplo.core.models.ItemType | None :async: .. py:method:: create_item_type(*, key: str, display_name: str, schema: dict[str, Any], owner: str = 'user') -> luplo.core.models.ItemType :async: .. py:method:: get_actor_by_email(email: str) -> luplo.core.models.Actor | None :async: .. py:method:: open_work_unit(*, id: str, project_id: str, title: str, description: str | None = None, system_ids: list[str] | None = None, created_by: str | None = None, context: dict[str, Any] | None = None) -> luplo.core.models.WorkUnit :async: .. py:method:: get_work_unit(id: str, *, project_id: str | None = None) -> luplo.core.models.WorkUnit | None :async: .. py:method:: list_work_units(project_id: str, *, status: str | None = None) -> list[luplo.core.models.WorkUnit] :async: .. py:method:: close_work_unit(id: str, *, actor_id: str, force: bool = False) -> luplo.core.models.WorkUnit | None :async: .. py:method:: archive_work_unit(*, id: str, archived_by: str, replaced_by_wu_id: str) -> luplo.core.models.WorkUnit :async: .. py:method:: find_existing_import_wu(*, project_id: str, content_hash_set: tuple[str, Ellipsis]) -> luplo.core.models.WorkUnit | None :async: .. py:method:: create_system(*, id: str, project_id: str, name: str, description: str | None = None, depends_on_system_ids: list[str] | None = None) -> luplo.core.models.System :async: .. py:method:: get_system(id: str, *, project_id: str | None = None) -> luplo.core.models.System | None :async: .. py:method:: list_systems(project_id: str) -> list[luplo.core.models.System] :async: .. py:method:: update_system(id: str, **kwargs: Any) -> luplo.core.models.System | None :async: .. py:method:: create_item(data: luplo.core.models.ItemCreate) -> luplo.core.models.Item :async: .. py:method:: get_item(id: str, *, project_id: str | None = None) -> luplo.core.models.Item | None :async: .. py:method:: list_items(project_id: str, *, item_type: str | None = None, system_id: str | None = None, work_unit_id: str | None = None, include_deleted: bool = False, limit: int = 100, offset: int = 0) -> list[luplo.core.models.Item] :async: .. py:method:: delete_item(id: str, *, actor_id: str) -> None :async: .. py:method:: get_supersedes_chain(id: str) -> list[luplo.core.models.Item] :async: .. py:method:: create_link(*, from_item_id: str, to_item_id: str, link_type: str, strength: int = 5, note: str | None = None, actor_id: str | None = None) -> luplo.core.models.Link :async: .. py:method:: get_links(item_id: str, *, direction: str = 'from', link_type: str | None = None) -> list[luplo.core.models.Link] :async: .. py:method:: delete_link(from_item_id: str, to_item_id: str, link_type: str) -> None :async: .. py:method:: impact(item_id: str, project_id: str, *, depth: int = 5) -> luplo.core.impact.ImpactResult :async: .. py:method:: run_checks(project_id: str, *, rule_names: list[str] | None = None, disabled: tuple[str, Ellipsis] = ()) -> list[luplo.core.checks.Finding] :async: .. py:method:: search(query: str, project_id: str, *, item_types: list[str] | None = None, system_ids: list[str] | None = None, limit: int = 10, tsquery: str | None = None) -> list[luplo.core.models.SearchResult] :async: .. py:method:: add_capture(*, text: str, created_by: str | None = None, summary: str | None = None, sensitivity_hint: str = 'none', signals: dict[str, Any] | None = None) -> luplo.core.models.Capture :async: .. py:method:: list_captures(*, review_state: str | None = None, include_discarded: bool = False, include_redacted: bool = False, since: datetime.datetime | None = None, until: datetime.datetime | None = None, limit: int = 100) -> list[luplo.core.models.Capture] :async: .. py:method:: get_capture(capture_id: str) -> luplo.core.models.Capture | None :async: .. py:method:: search_captures(*, query: str | None = None, review_state: str | None = None, include_discarded: bool = False, include_redacted: bool = False, since: datetime.datetime | None = None, until: datetime.datetime | None = None, limit: int = 50) -> list[luplo.core.models.Capture] :async: .. py:method:: set_capture_state(capture_id: str, *, review_state: str, actor_id: str | None = None) -> luplo.core.models.Capture :async: .. py:method:: discard_capture(capture_id: str, *, actor_id: str | None = None) -> luplo.core.models.Capture :async: .. py:method:: redact_capture(capture_id: str, *, redacted_by: str | None = None) -> luplo.core.models.Capture :async: .. py:method:: annotate_capture(capture_id: str, *, summary: str | None = None, sensitivity_hint: str | None = None, signals: dict[str, Any] | None = None) -> luplo.core.models.Capture :async: .. py:method:: promote_capture_to_item(capture_id: str, data: luplo.core.models.ItemCreate) -> tuple[luplo.core.models.Capture, luplo.core.models.Item] :async: .. py:method:: create_glossary_group(*, id: str, project_id: str, canonical: str, definition: str | None = None, scope: str = 'project', scope_id: str | None = None, created_by: str | None = None) -> luplo.core.models.GlossaryGroup :async: .. py:method:: get_glossary_group(id: str, *, project_id: str | None = None) -> luplo.core.models.GlossaryGroup | None :async: .. py:method:: list_glossary_groups(project_id: str, *, needs_review: bool = False, limit: int = 100, offset: int = 0) -> list[luplo.core.models.GlossaryGroup] :async: .. py:method:: create_glossary_term(*, id: str, group_id: str | None, surface: str, normalized: str, is_protected: bool = False, status: str = 'pending', source_item_id: str | None = None, context_snippet: str | None = None) -> luplo.core.models.GlossaryTerm :async: .. py:method:: list_pending_terms(project_id: str, *, limit: int = 50) -> list[luplo.core.models.GlossaryTerm] :async: .. py:method:: create_glossary_group_with_canonical(*, project_id: str, canonical: str, definition: str | None = None, actor_id: str | None = None) -> tuple[luplo.core.models.GlossaryGroup, luplo.core.models.GlossaryTerm] :async: .. py:method:: add_term_to_group(group_id: str, *, surface: str, actor_id: str, as_canonical: bool = False) -> luplo.core.models.GlossaryTerm :async: .. py:method:: delete_glossary_term(term_id: str, *, actor_id: str) -> bool :async: .. py:method:: approve_term(term_id: str, *, group_id: str, actor_id: str, as_canonical: bool = False) -> luplo.core.models.GlossaryTerm | None :async: .. py:method:: reject_term(term_id: str, *, actor_id: str, reason: str | None = None) -> luplo.core.models.GlossaryRejection | None :async: .. py:method:: merge_groups(source_group_id: str, target_group_id: str, *, actor_id: str) -> luplo.core.models.GlossaryGroup | None :async: .. py:method:: split_term(term_id: str, *, new_canonical: str, actor_id: str) -> luplo.core.models.GlossaryGroup | None :async: .. py:method:: expand_query(query: str, project_id: str) -> str :async: .. py:method:: record_history(*, item_id: str, version: int, changed_by: str, content_before: str | None = None, content_after: str | None = None, content_hash_before: str | None = None, content_hash_after: str | None = None, diff_summary: str | None = None, semantic_impact: str | None = None, source_event_id: str | None = None) -> luplo.core.models.HistoryEntry :async: .. py:method:: query_history(*, project_id: str | None = None, item_id: str | None = None, since: datetime.datetime | None = None, semantic_impacts: list[str] | None = None, limit: int = 50) -> list[luplo.core.models.HistoryEntry] :async: .. py:method:: record_audit(*, actor_id: str, action: str, target_type: str | None = None, target_id: str | None = None, metadata: dict[str, Any] | None = None) -> None :async: .. py:method:: create_task(*, project_id: str, work_unit_id: str, title: str, actor_id: str, sort_order: int | None = None, systems: list[str] | None = None, body: str | None = None, context_extra: dict[str, Any] | None = None) -> luplo.core.models.Item :async: .. py:method:: get_task(task_id: str, *, project_id: str | None = None) -> luplo.core.models.Item | None :async: .. py:method:: list_tasks(work_unit_id: str, *, status: str | None = None) -> list[luplo.core.models.Item] :async: .. py:method:: get_in_progress_task(work_unit_id: str) -> luplo.core.models.Item | None :async: .. py:method:: start_task(task_id: str, *, actor_id: str, project_id: str | None = None) -> luplo.core.models.Item :async: .. py:method:: complete_task(task_id: str, *, actor_id: str, summary: str | None = None, project_id: str | None = None) -> luplo.core.models.Item :async: .. py:method:: block_task(task_id: str, *, actor_id: str, reason: str, project_id: str | None = None) -> luplo.core.models.Item :async: .. py:method:: skip_task(task_id: str, *, actor_id: str, reason: str | None = None, project_id: str | None = None) -> luplo.core.models.Item :async: .. py:method:: reorder_tasks(work_unit_id: str, task_ids: list[str], *, actor_id: str, project_id: str | None = None) -> list[luplo.core.models.Item] :async: .. py:method:: suggest_decision_from_task(task_id: str, *, project_id: str | None = None) -> luplo.core.models.ItemCreate | None :async: .. py:method:: edit_task(task_id: str, *, actor_id: str, title: str | None = None, body: str | None = None, sort_order: int | None = None, project_id: str | None = None) -> luplo.core.models.Item :async: .. py:method:: add_idea(*, project_id: str, work_unit_id: str, text: str, created_by: str | None = None) -> luplo.core.models.Idea :async: .. py:method:: list_ideas(*, work_unit_id: str, project_id: str | None = None, limit: int = 100, include_redacted: bool = False) -> list[luplo.core.models.Idea] :async: .. py:method:: search_ideas(*, project_id: str, query: str | None = None, tsquery: str | None = None, work_unit_id: str | None = None, author: str | None = None, since: datetime.datetime | None = None, until: datetime.datetime | None = None, include_redacted: bool = False, limit: int = 50) -> list[luplo.core.models.Idea] :async: .. py:method:: get_idea(idea_id: str, *, project_id: str | None = None) -> luplo.core.models.Idea | None :async: .. py:method:: redact_idea(*, idea_id: str, redacted_by: str, project_id: str | None = None) -> tuple[luplo.core.models.Idea, bool] :async: .. py:method:: create_qa(*, project_id: str, title: str, actor_id: str, coverage: str, areas: list[str] | None = None, target_item_ids: list[str] | None = None, target_task_ids: list[str] | None = None, work_unit_id: str | None = None, body: str | None = None, context_extra: dict[str, Any] | None = None) -> luplo.core.models.Item :async: .. py:method:: get_qa(qa_id: str, *, project_id: str | None = None) -> luplo.core.models.Item | None :async: .. py:method:: list_qa(project_id: str, *, status: str | None = None, work_unit_id: str | None = None) -> list[luplo.core.models.Item] :async: .. py:method:: list_pending_qa_for_task(task_id: str) -> list[luplo.core.models.Item] :async: .. py:method:: list_pending_qa_for_item(item_id: str) -> list[luplo.core.models.Item] :async: .. py:method:: list_pending_qa_for_wu(work_unit_id: str) -> list[luplo.core.models.Item] :async: .. py:method:: start_qa(qa_id: str, *, actor_id: str, project_id: str | None = None) -> luplo.core.models.Item :async: .. py:method:: pass_qa(qa_id: str, *, actor_id: str, evidence: str | None = None, project_id: str | None = None) -> luplo.core.models.Item :async: .. py:method:: fail_qa(qa_id: str, *, actor_id: str, reason: str, project_id: str | None = None) -> luplo.core.models.Item :async: .. py:method:: block_qa(qa_id: str, *, actor_id: str, reason: str, project_id: str | None = None) -> luplo.core.models.Item :async: .. py:method:: skip_qa(qa_id: str, *, actor_id: str, project_id: str | None = None) -> luplo.core.models.Item :async: .. py:method:: assign_qa(qa_id: str, *, actor_id: str, assignee_actor_id: str, project_id: str | None = None) -> luplo.core.models.Item :async: .. py:method:: enqueue_sync(*, source_type: str, source_page_id: str, payload: str | None = None, source_event_id: str | None = None, debounce_seconds: int = 300) -> luplo.core.models.SyncJob :async: .. py:method:: get_ready_sync_jobs(*, limit: int = 1) -> list[luplo.core.models.SyncJob] :async: .. py:method:: complete_sync_job(job_id: int) -> None :async: .. py:method:: fail_sync_job(job_id: int, *, error: str) -> None :async: