luplo.core.work_units

CRUD operations for the work_units table.

Work units are the user-facing grouping of intent — “vendor system design”, “karma rework”, etc. They replace the old sessions concept and live across multiple Claude sessions. A→B developer handoff is the core use case: created_by and closed_by can differ naturally.

Functions

open_work_unit(→ luplo.core.models.WorkUnit)

Create a new work unit in in_progress status.

get_work_unit(→ luplo.core.models.WorkUnit | None)

Fetch a single work unit by ID or hex prefix (≥8 chars).

list_work_units(→ list[luplo.core.models.WorkUnit])

List work units for a project, optionally filtered by status.

find_work_units(→ list[luplo.core.models.WorkUnit])

Search in-progress work units by title substring (case-insensitive).

find_existing_import_wu(...)

Find a non-archived import work unit whose content hash set matches.

archive_work_unit(→ tuple[luplo.core.models.WorkUnit, int])

Mark a work unit as superseded by a force-import.

close_work_unit(→ luplo.core.models.WorkUnit | None)

Close a work unit by setting its status, closed_at, and closed_by.

Module Contents

async luplo.core.work_units.open_work_unit(conn: psycopg.AsyncConnection[Any], *, project_id: str, title: str, description: str | None = None, system_ids: list[str] | None = None, created_by: str | None = None, id: str | None = None, context: dict[str, Any] | None = None) luplo.core.models.WorkUnit

Create a new work unit in in_progress status.

Parameters:
  • conn – Async psycopg connection.

  • project_id – Project this work unit belongs to.

  • title – Human-readable title (e.g. “Vendor system design”).

  • description – Optional longer description.

  • system_ids – Systems this work unit touches.

  • created_by – Actor ID of who opened it.

  • context – Free-form JSONB metadata (e.g. lp import dedup state). Defaults to an empty object. Mirrors the items.context shape introduced in migration 0003.

Returns:

The newly created WorkUnit.

async luplo.core.work_units.get_work_unit(conn: psycopg.AsyncConnection[Any], wu_id: str, *, project_id: str | None = None) luplo.core.models.WorkUnit | None

Fetch a single work unit by ID or hex prefix (≥8 chars).

Parameters:
  • conn – Open async connection.

  • wu_id – Full UUID or hex prefix.

  • project_id – Optional scope; strongly recommended whenever the caller knows the project to avoid cross-project collisions.

Returns:

The work unit, or None when nothing matches.

Raises:
async luplo.core.work_units.list_work_units(conn: psycopg.AsyncConnection[Any], project_id: str, *, status: str | None = None) list[luplo.core.models.WorkUnit]

List work units for a project, optionally filtered by status.

Parameters:
  • conn – Async psycopg connection.

  • project_id – Required project scope.

  • status – Filter by status (e.g. "in_progress").

Returns:

List of WorkUnit ordered by created_at DESC.

async luplo.core.work_units.find_work_units(conn: psycopg.AsyncConnection[Any], project_id: str, query: str) list[luplo.core.models.WorkUnit]

Search in-progress work units by title substring (case-insensitive).

Used by luplo_work_resume to find a work unit to continue. Only returns in_progress work units.

Parameters:
  • conn – Async psycopg connection.

  • project_id – Required project scope.

  • query – Substring to match against title.

Returns:

Matching work units ordered by created_at DESC.

async luplo.core.work_units.find_existing_import_wu(conn: psycopg.AsyncConnection[Any], *, project_id: str, content_hash_set: tuple[str, Ellipsis]) luplo.core.models.WorkUnit | None

Find a non-archived import work unit whose content hash set matches.

Used by lp import begin for dedup. Matches when:

  • project_id matches the work unit’s project, AND

  • status is not in ('archived', 'abandoned') — done imports still occupy that content set, so a re-import without --force should refuse, AND

  • context->>'kind' = 'import', AND

  • the sorted set of context.content_hash_set equals the sorted form of the supplied content_hash_set (order-insensitive).

Content-hash matching (vs path matching) is what makes dedup invariant under cwd changes, symlinks, and the cloud MCP scenario where the server has no concept of the agent’s filesystem layout.

Parameters:
  • conn – Async psycopg connection.

  • project_id – Project scope for the lookup.

  • content_hash_set – Sha256 hashes captured by begin_import. The tuple is sorted in Python before binding to PG so the SQL comparison is order-insensitive.

Returns:

The most recently created matching WorkUnit, or None when no match exists.

async luplo.core.work_units.archive_work_unit(conn: psycopg.AsyncConnection[Any], wu_id: str, *, archived_by: str, replaced_by_wu_id: str) tuple[luplo.core.models.WorkUnit, int]

Mark a work unit as superseded by a force-import.

Sets status='archived', stamps closed_at/closed_by, and merges {"replaced_by": replaced_by_wu_id} into the existing context JSONB (pre-existing keys are preserved). In the same transaction, soft-deletes every active item linked to the work unit (work_unit_id = wu_id AND deleted_at IS NULL) by stamping deleted_at = now(). Rows are preserved for audit; default item-search paths simply stop returning them.

Distinct from close_work_unit() (status='done' or 'abandoned'): archive is the explicit “this work unit was replaced by a fresh re-import” signal used by lp import --force. Soft-deleting linked items prevents duplicate hits in default search when force-replace re-extracts the bundle (e.g. into a new language).

Parameters:
  • conn – Async psycopg connection.

  • wu_id – ID of the work unit being archived.

  • archived_by – Actor ID stamped into closed_by.

  • replaced_by_wu_id – ID of the successor work unit, recorded in context.replaced_by so consumers can follow the chain.

Returns:

A tuple of the updated WorkUnit and the number of items that were soft-deleted as part of the archive.

Raises:

ValueError – When no work unit matches wu_id, or when the work unit exists but is not in in_progress state. Mirrors the status='in_progress' guard used by close_work_unit() so that already-closed/abandoned/archived units cannot have their status, closed_at, closed_by, or context.replaced_by silently overwritten.

async luplo.core.work_units.close_work_unit(conn: psycopg.AsyncConnection[Any], wu_id: str, *, actor_id: str, status: str = 'done') luplo.core.models.WorkUnit | None

Close a work unit by setting its status, closed_at, and closed_by.

closed_by may differ from created_by — this is the natural record of an A→B developer handoff.

Parameters:
  • conn – Async psycopg connection.

  • wu_id – ID of the work unit to close.

  • actor_id – Who is closing it.

  • status – Target status ("done" or "abandoned").

Returns:

The updated WorkUnit, or None if it was not found or already closed.