Skip to content

Orchestrator

Runner

runner

Orchestrator actor lifecycle -- the "thinking" layer above workers.

The orchestrator is a longer-lived LLM actor that: - Receives high-level goals (OrchestratorGoal messages) - Decomposes them into subtasks for workers (via decomposer.py) - Dispatches subtasks through the router and collects results - Synthesizes worker outputs into a coherent final answer (via synthesizer.py) - Performs periodic self-summarization checkpoints (via checkpoint.py)

This differs from PipelineOrchestrator in that it uses an LLM to dynamically decide which workers to invoke, rather than following a fixed stage sequence.

Message flow::

loom.goals.incoming --> OrchestratorActor.handle_message()
    --> GoalDecomposer breaks goal into subtasks
    --> Publishes TaskMessages to loom.tasks.incoming (one per subtask)
    --> Subscribes to loom.results.{goal_id} for worker responses
    --> ResultSynthesizer combines results into a coherent answer
    --> Publishes final TaskResult to loom.results.{goal_id}
Concurrency model

The max_concurrent_goals config setting (default 1) controls how many goals a single OrchestratorActor instance can process simultaneously. With the default of 1, goals are queued and processed one at a time (strict ordering). Higher values enable concurrent goal processing within a single instance. For horizontal scaling, run multiple OrchestratorActor instances with a NATS queue group.

All mutable state (conversation_history, checkpoint_counter) is per-goal inside GoalState, so concurrent goals are fully isolated — no shared mutable data, no locks required.

Within a single goal, subtasks are dispatched concurrently (all published to loom.tasks.incoming at once) and results are collected as they arrive.

State tracking

The orchestrator is the ONLY stateful component in Loom. It maintains: - _active_goals: maps goal_id -> GoalState for in-flight goals

Each GoalState carries its own conversation_history and checkpoint_counter so that concurrent goals never interfere.

Workers and the router are stateless by design.

See Also

loom.orchestrator.pipeline -- PipelineOrchestrator (fixed stage sequence) loom.orchestrator.decomposer -- GoalDecomposer (LLM-based task breakdown) loom.orchestrator.synthesizer -- ResultSynthesizer (result combination) loom.orchestrator.checkpoint -- CheckpointManager (context compression) loom.core.messages -- all message schemas

GoalState dataclass

GoalState(goal: OrchestratorGoal, dispatched_tasks: dict[str, TaskMessage] = dict(), collected_results: dict[str, TaskResult] = dict(), start_time: float = time.monotonic(), conversation_history: list[dict[str, Any]] = list(), checkpoint_counter: int = 0)

Tracks the lifecycle of a single goal through decomposition and collection.

One GoalState exists per in-flight goal. It is created when a goal arrives, populated during decomposition, updated as results trickle in, and discarded after synthesis completes.

Conversation history and checkpoint state are per-goal so that concurrent goals (max_concurrent_goals > 1) maintain fully isolated state — no shared mutable data, no locks required.

Attributes:

Name Type Description
goal OrchestratorGoal

The original OrchestratorGoal message.

dispatched_tasks dict[str, TaskMessage]

Maps task_id -> TaskMessage for every subtask that was published to loom.tasks.incoming.

collected_results dict[str, TaskResult]

Maps task_id -> TaskResult for every result received on loom.results.{goal_id}.

start_time float

Monotonic timestamp when processing began.

conversation_history list[dict[str, Any]]

Accumulated context entries for checkpoint decisions. Each entry is a compact summary of a completed goal.

checkpoint_counter int

Monotonically increasing checkpoint version number for this goal's checkpoint chain.

all_collected property

all_collected: bool

True when every dispatched task has a corresponding result.

pending_count property

pending_count: int

Number of dispatched tasks still awaiting results.

OrchestratorActor

OrchestratorActor(actor_id: str, config_path: str, backend: LLMBackend, nats_url: str = 'nats://nats:4222', checkpoint_store: CheckpointStore | None = None, *, bus: Any | None = None)

Bases: BaseActor

Dynamic orchestrator actor -- LLM-driven goal decomposition and synthesis.

Unlike :class:PipelineOrchestrator which follows a fixed stage sequence, this actor uses an LLM to dynamically reason about which workers to invoke and how to combine their results.

Lifecycle per goal:

  1. Receive -- parse the incoming dict as an OrchestratorGoal.
  2. Decompose -- call :class:GoalDecomposer to break the goal into a list of TaskMessage subtasks.
  3. Dispatch -- publish each subtask to loom.tasks.incoming so the router can forward them to the appropriate workers.
  4. Collect -- subscribe to loom.results.{goal_id} and gather TaskResult messages until all subtasks have responded or the timeout expires.
  5. Synthesize -- call :class:ResultSynthesizer to combine all collected results into a coherent final answer.
  6. Publish -- send the synthesized TaskResult to loom.results.{goal_id} for the original caller.
  7. Checkpoint (optional) -- if the accumulated conversation history exceeds the token threshold, compress it via :class:CheckpointManager.
Parameters

actor_id : str Unique identifier for this actor instance. config_path : str Path to the orchestrator YAML config file (e.g. configs/orchestrators/default.yaml). backend : LLMBackend LLM backend used for both decomposition and synthesis. Typically the same backend instance, but could be different tiers. nats_url : str NATS server URL. checkpoint_store : CheckpointStore | None Checkpoint persistence backend. Pass None to disable checkpointing.

Example:

::

from loom.worker.backends import OllamaBackend
from loom.contrib.redis.store import RedisCheckpointStore

backend = OllamaBackend(model="command-r7b:latest")
store = RedisCheckpointStore("redis://localhost:6379")
actor = OrchestratorActor(
    actor_id="orchestrator-1",
    config_path="configs/orchestrators/default.yaml",
    backend=backend,
    nats_url="nats://localhost:4222",
    checkpoint_store=store,
)
await actor.run("loom.goals.incoming")
Source code in src/loom/orchestrator/runner.py
def __init__(
    self,
    actor_id: str,
    config_path: str,
    backend: LLMBackend,
    nats_url: str = "nats://nats:4222",
    checkpoint_store: CheckpointStore | None = None,
    *,
    bus: Any | None = None,
) -> None:
    # Load config first so we can read max_concurrent_goals before
    # passing it to BaseActor.
    self._config_path = config_path
    self.config = self._load_config(config_path)
    max_goals = self.config.get("max_concurrent_goals", 1)
    super().__init__(actor_id, nats_url, max_concurrent=max_goals, bus=bus)
    self.backend = backend

    # Build the decomposer from config-defined available workers.
    # Each entry needs at least "name" and "description".
    available_workers = self.config.get("available_workers", [])
    if not available_workers:
        # Fallback: infer from the system_prompt if no explicit list.
        # The default.yaml lists workers in the system prompt text; callers
        # should provide an explicit list for production use.
        logger.warning(
            "orchestrator.no_available_workers",
            hint="Add 'available_workers' list to orchestrator config",
        )
    self.decomposer = GoalDecomposer.from_worker_configs(
        backend=backend,
        configs=available_workers,
    )

    # Synthesizer uses the same backend for LLM-based synthesis.
    self.synthesizer = ResultSynthesizer(backend=backend)

    # Checkpoint manager -- only initialized if a checkpoint store is provided.
    checkpoint_config = self.config.get("checkpoint", {})
    self._checkpoint_manager: CheckpointManager | None = None
    if checkpoint_store is not None:
        self._checkpoint_manager = CheckpointManager(
            store=checkpoint_store,
            token_threshold=checkpoint_config.get("token_threshold", 50_000),
            recent_window_size=checkpoint_config.get("recent_window", 5),
        )

    # Configurable timeouts and concurrency limits from YAML.
    self._task_timeout: float = float(self.config.get("timeout_seconds", 300))
    self._max_concurrent_tasks: int = self.config.get("max_concurrent_tasks", 5)

    # ---------- Mutable state ----------
    # Active goals being processed.  Keyed by goal_id.
    # With max_concurrent_goals > 1, multiple goals can be in-flight
    # simultaneously.  Each goal's mutable state (conversation_history,
    # checkpoint_counter) is isolated inside its GoalState — no shared
    # mutable data between goals, no locks required.
    self._active_goals: dict[str, GoalState] = {}

on_reload async

on_reload() -> None

Re-read the orchestrator config from disk on reload signal.

Updates config-derived settings (timeouts, concurrency limits). Does not rebuild the decomposer or synthesizer — those are constructed from the backend, which doesn't change at runtime.

Source code in src/loom/orchestrator/runner.py
async def on_reload(self) -> None:
    """Re-read the orchestrator config from disk on reload signal.

    Updates config-derived settings (timeouts, concurrency limits).
    Does not rebuild the decomposer or synthesizer — those are
    constructed from the backend, which doesn't change at runtime.
    """
    self.config = self._load_config(self._config_path)
    self._task_timeout = float(self.config.get("timeout_seconds", 300))
    self._max_concurrent_tasks = self.config.get("max_concurrent_tasks", 5)
    logger.info("orchestrator.config_reloaded", config_path=self._config_path)

handle_message async

handle_message(data: dict[str, Any]) -> None

Handle an incoming OrchestratorGoal.

This is the main entry point called by :meth:BaseActor._process_one for every message received on loom.goals.incoming.

The method orchestrates the full goal lifecycle: parse, decompose, dispatch, collect, synthesize, publish. Errors at any stage result in a FAILED TaskResult published to the goal's result subject.

Parameters

data : dict[str, Any] Raw message dict, expected to conform to :class:OrchestratorGoal schema.

Source code in src/loom/orchestrator/runner.py
async def handle_message(self, data: dict[str, Any]) -> None:
    """Handle an incoming OrchestratorGoal.

    This is the main entry point called by :meth:`BaseActor._process_one`
    for every message received on ``loom.goals.incoming``.

    The method orchestrates the full goal lifecycle: parse, decompose,
    dispatch, collect, synthesize, publish.  Errors at any stage result
    in a ``FAILED`` TaskResult published to the goal's result subject.

    Parameters
    ----------
    data : dict[str, Any]
        Raw message dict, expected to conform to
        :class:`OrchestratorGoal` schema.
    """
    # -- 1. Parse --
    try:
        goal = OrchestratorGoal(**data)
    except Exception as e:
        logger.error(
            "orchestrator.parse_error",
            error=str(e),
            data_keys=list(data.keys()),
        )
        return

    log = logger.bind(goal_id=goal.goal_id)
    log.info("orchestrator.goal_received", instruction=goal.instruction[:120])

    goal_state = GoalState(goal=goal)
    self._active_goals[goal.goal_id] = goal_state

    try:
        # -- 2. Decompose --
        with _tracer.start_as_current_span(
            "orchestrator.decompose",
            attributes={"orchestrator.goal_id": goal.goal_id},
        ) as decompose_span:
            subtasks = await self._decompose_goal(goal, log)
            decompose_span.set_attribute(
                "orchestrator.subtask_count",
                len(subtasks) if subtasks else 0,
            )

        if not subtasks:
            log.warning("orchestrator.no_subtasks")
            await self._publish_final_result(
                goal,
                TaskStatus.FAILED,
                error="Decomposition produced no subtasks for this goal.",
            )
            return

        # Enforce max concurrent tasks limit.
        if len(subtasks) > self._max_concurrent_tasks:
            log.warning(
                "orchestrator.subtask_limit",
                requested=len(subtasks),
                limit=self._max_concurrent_tasks,
            )
            subtasks = subtasks[: self._max_concurrent_tasks]

        # -- 3. Dispatch --
        with _tracer.start_as_current_span(
            "orchestrator.dispatch",
            attributes={
                "orchestrator.goal_id": goal.goal_id,
                "orchestrator.subtask_count": len(subtasks),
            },
        ):
            await self._dispatch_subtasks(goal_state, subtasks, log)

        # -- 4. Collect results --
        with _tracer.start_as_current_span(
            "orchestrator.collect",
            attributes={
                "orchestrator.goal_id": goal.goal_id,
                "orchestrator.expected_count": len(goal_state.dispatched_tasks),
                "orchestrator.timeout_seconds": self._task_timeout,
            },
        ) as collect_span:
            results = await self._collect_results(goal_state, log)
            collect_span.set_attribute("orchestrator.collected_count", len(results))
            collect_span.set_attribute(
                "orchestrator.success_count",
                sum(1 for r in results if r.status == TaskStatus.COMPLETED),
            )

        # -- 5. Synthesize --
        with _tracer.start_as_current_span(
            "orchestrator.synthesize",
            attributes={
                "orchestrator.goal_id": goal.goal_id,
                "orchestrator.result_count": len(results),
            },
        ) as synth_span:
            synthesis = await self._synthesize_results(goal, results, log)
            synth_span.set_attribute(
                "orchestrator.confidence",
                synthesis.get("confidence", "unknown"),
            )

        # -- 6. Publish final result --
        elapsed = int((time.monotonic() - goal_state.start_time) * 1000)
        await self._publish_final_result(
            goal,
            TaskStatus.COMPLETED,
            output=synthesis,
            elapsed=elapsed,
        )
        log.info("orchestrator.goal_completed", ms=elapsed)

        # -- 7. Record in conversation history and check for checkpoint --
        await self._record_in_history(goal_state, results, synthesis)
        await self._maybe_checkpoint(goal_state, log)

    except Exception as e:
        log.error("orchestrator.goal_failed", error=str(e), exc_info=True)
        elapsed = int((time.monotonic() - goal_state.start_time) * 1000)
        await self._publish_final_result(
            goal,
            TaskStatus.FAILED,
            error=f"Orchestrator error: {e}",
            elapsed=elapsed,
        )
    finally:
        # Clean up goal state regardless of outcome.
        self._active_goals.pop(goal.goal_id, None)

Pipeline

pipeline

Pipeline orchestrator for multi-stage processing with automatic parallelism.

Executes a defined sequence of stages, passing results from each stage as input to later stages. Each stage maps to a worker_type. Stages can be LLM workers, processor workers, or any other actor — the pipeline doesn't care about the implementation, only the message contract.

Stage dependencies are automatically inferred from input_mapping paths: if stage B references "A.output.field", then B depends on A. Stages with no inter-stage dependencies (only goal.* paths) are independent and execute in parallel. Alternatively, explicit depends_on lists in the YAML config override automatic inference.

Execution proceeds in levels — each level contains stages whose dependencies are all satisfied by earlier levels. Stages within a level run concurrently via asyncio.wait(FIRST_COMPLETED) for incremental progress reporting.

Pipeline definition comes from YAML config with stages, input mappings, and optional conditions.

Data flow through the pipeline::

OrchestratorGoal arrives at handle_message()
    ↓
context = { "goal": { "instruction": ..., "context": { ... } } }
    ↓
Build execution levels from stage dependencies (Kahn's algorithm)
    ↓
For each level:
    For each stage in level (concurrently if >1):
        1. Evaluate condition (skip if false)
        2. Build payload via input_mapping (dot-notation paths into context)
        3. Publish TaskMessage to loom.tasks.incoming
        4. Wait for TaskResult on loom.results.{goal_id}
        5. Store result: context[stage_name] = { "output": ..., ... }
    ↓
Publish final TaskResult with all stage outputs

Input mapping example (from doc_pipeline.yaml)::

input_mapping:
    text_preview: "extract.output.text_preview"
    metadata: "extract.output.metadata"

This resolves to::

payload["text_preview"] = context["extract"]["output"]["text_preview"]
payload["metadata"] = context["extract"]["output"]["metadata"]
See Also

loom.orchestrator.runner — dynamic LLM-based orchestrator loom.core.messages.OrchestratorGoal — the input message type configs/orchestrators/ — pipeline config YAML files

PipelineStageError

PipelineStageError(stage_name: str, message: str)

Bases: Exception

Raised when a pipeline stage fails or times out.

Source code in src/loom/orchestrator/pipeline.py
def __init__(self, stage_name: str, message: str) -> None:
    self.stage_name = stage_name
    super().__init__(message)

PipelineTimeoutError

PipelineTimeoutError(stage_name: str, message: str)

Bases: PipelineStageError

Raised when a pipeline stage times out waiting for a result.

Source code in src/loom/orchestrator/pipeline.py
def __init__(self, stage_name: str, message: str) -> None:
    self.stage_name = stage_name
    super().__init__(message)

PipelineValidationError

PipelineValidationError(stage_name: str, message: str)

Bases: PipelineStageError

Raised when input or output schema validation fails for a stage.

Source code in src/loom/orchestrator/pipeline.py
def __init__(self, stage_name: str, message: str) -> None:
    self.stage_name = stage_name
    super().__init__(message)

PipelineWorkerError

PipelineWorkerError(stage_name: str, message: str)

Bases: PipelineStageError

Raised when a worker returns FAILED status for a stage.

Source code in src/loom/orchestrator/pipeline.py
def __init__(self, stage_name: str, message: str) -> None:
    self.stage_name = stage_name
    super().__init__(message)

PipelineMappingError

PipelineMappingError(stage_name: str, message: str)

Bases: PipelineStageError

Raised when input_mapping resolution fails for a stage.

Source code in src/loom/orchestrator/pipeline.py
def __init__(self, stage_name: str, message: str) -> None:
    self.stage_name = stage_name
    super().__init__(message)

PipelineOrchestrator

PipelineOrchestrator(actor_id: str, config_path: str, nats_url: str = 'nats://nats:4222', *, bus: Any | None = None)

Bases: BaseActor

Pipeline orchestrator with automatic stage parallelism.

Processes an OrchestratorGoal by running it through a series of stages organized into execution levels based on their dependencies. Stages within the same level run concurrently; levels execute sequentially. Stage outputs are accumulated in a context dict and can be referenced by subsequent stages via input_mapping.

Source code in src/loom/orchestrator/pipeline.py
def __init__(
    self,
    actor_id: str,
    config_path: str,
    nats_url: str = "nats://nats:4222",
    *,
    bus: Any | None = None,
) -> None:
    self._config_path = config_path
    self.config = self._load_config(config_path)
    max_goals = self.config.get("max_concurrent_goals", 1)
    super().__init__(actor_id, nats_url, max_concurrent=max_goals, bus=bus)

on_reload async

on_reload() -> None

Re-read the pipeline config from disk on reload signal.

Source code in src/loom/orchestrator/pipeline.py
async def on_reload(self) -> None:
    """Re-read the pipeline config from disk on reload signal."""
    self.config = self._load_config(self._config_path)
    logger.info("pipeline.config_reloaded", config_path=self._config_path)

handle_message async

handle_message(data: dict[str, Any]) -> None

Execute the pipeline for an incoming orchestrator goal.

Source code in src/loom/orchestrator/pipeline.py
async def handle_message(self, data: dict[str, Any]) -> None:
    """Execute the pipeline for an incoming orchestrator goal."""
    goal = OrchestratorGoal(**data)
    stages = self.config["pipeline_stages"]
    timeout = self.config.get("timeout_seconds", 300)

    log = logger.bind(
        goal_id=goal.goal_id,
        pipeline=self.config["name"],
        request_id=goal.request_id or goal.goal_id,
    )

    # Build execution levels from dependency graph.
    deps = self._infer_dependencies(stages)
    levels = self._build_execution_levels(stages, deps)

    # Log execution plan.
    level_summary = [[s["name"] for s in level] for level in levels]
    log.info(
        "pipeline.started",
        stages=len(stages),
        levels=len(levels),
        execution_plan=level_summary,
    )

    # Accumulated context: goal info + results from each completed stage.
    context: dict[str, Any] = {
        "goal": {
            "instruction": goal.instruction,
            "context": goal.context,
        },
    }

    start = time.monotonic()
    completed_stage_count = 0
    total_stage_count = len(stages)

    try:
        for level_idx, level in enumerate(levels):
            level_log = log.bind(level=level_idx)

            if len(level) == 1:
                # Single stage — no concurrency overhead.
                stage = level[0]
                name, result_dict = await self._execute_stage(
                    stage,
                    context,
                    goal,
                    timeout,
                    level_log,
                )
                if not result_dict.get("_skipped"):
                    context[name] = result_dict
                completed_stage_count += 1
                level_log.info(
                    "pipeline.stage_progress",
                    completed=completed_stage_count,
                    total=total_stage_count,
                )
            else:
                completed_stage_count = await self._execute_parallel_level(
                    level,
                    context,
                    goal,
                    timeout,
                    level_log,
                    completed_stage_count,
                    total_stage_count,
                )

    except PipelineStageError as e:
        # Build a brief input summary for diagnostics.
        stage_context = context.get("goal", {}).get("context")
        input_summary = repr(stage_context)[:200] if stage_context else ""
        log.error(
            "pipeline.stage_failed",
            stage=e.stage_name,
            error_type=type(e).__name__,
            error=str(e),
            input_summary=input_summary,
        )
        elapsed = int((time.monotonic() - start) * 1000)
        await self._publish_pipeline_result(
            goal,
            TaskStatus.FAILED,
            error=str(e),
            elapsed=elapsed,
        )
        return
    except Exception as e:
        log.error(
            "pipeline.unexpected_error",
            error=str(e),
            error_type=type(e).__name__,
        )
        elapsed = int((time.monotonic() - start) * 1000)
        await self._publish_pipeline_result(
            goal,
            TaskStatus.FAILED,
            error=f"Pipeline error ({type(e).__name__}): {e}",
            elapsed=elapsed,
        )
        return

    # All stages complete.
    elapsed = int((time.monotonic() - start) * 1000)
    log.info("pipeline.completed", ms=elapsed, stages_run=len(context) - 1)

    # Build final output from all stage results.
    final_output = {
        name: data["output"]
        for name, data in context.items()
        if name != "goal" and isinstance(data, dict) and "output" in data
    }

    # Build execution timeline for observability.
    timeline = [
        {
            "stage": name,
            "started_at": data.get("started_at"),
            "ended_at": data.get("ended_at"),
            "wall_time_ms": data.get("wall_time_ms"),
            "processing_time_ms": data.get("processing_time_ms"),
        }
        for name, data in context.items()
        if name != "goal" and isinstance(data, dict) and "started_at" in data
    ]

    await self._publish_pipeline_result(
        goal,
        TaskStatus.COMPLETED,
        output=final_output,
        elapsed=elapsed,
        timeline=timeline,
    )

Decomposer

decomposer

Task decomposition logic for orchestrators.

Responsible for breaking down complex goals into concrete subtasks that can be routed to individual workers.

This module is used by OrchestratorActor (runner.py), NOT by PipelineOrchestrator (which has its stages pre-defined in YAML).

The GoalDecomposer uses an LLM backend to analyze a high-level goal and produce a list of concrete TaskMessages, each targeting a specific worker_type. The LLM is given the goal instruction, domain context, and a manifest of available workers (names, descriptions, and input schemas) so it can make informed routing decisions and construct valid payloads.

The decomposition prompt asks the LLM to output structured JSON::

[
    {"worker_type": "extractor", "payload": {...}, "model_tier": "local"},
    {"worker_type": "summarizer", "payload": {...}, "model_tier": "local"},
    ...
]

Each entry maps directly to a TaskMessage. The parent_task_id is set to the caller-provided value so that worker results route back to the orchestrator via loom.results.{goal_id}.

See Also

loom.core.messages.TaskMessage — the output message type loom.core.messages.OrchestratorGoal — the input message type loom.worker.backends.LLMBackend — the LLM interface used for decomposition

WorkerDescriptor dataclass

WorkerDescriptor(name: str, description: str, input_schema: dict[str, Any] = dict(), default_tier: str = 'standard')

Metadata about an available worker type.

Used to ground the LLM's decomposition in what the system can actually execute. Typically constructed from a worker's YAML config file via the :meth:GoalDecomposer.from_worker_configs factory method.

Attributes:

Name Type Description
name str

The worker_type identifier (e.g. "summarizer", "extractor"). Must match the name field in the worker's YAML config.

description str

One-line human-readable description of what the worker does.

input_schema dict[str, Any]

JSON Schema dict for the worker's expected payload. Included in the LLM prompt so it can construct valid payloads.

default_tier str

The default ModelTier string for this worker ("local", "standard", or "frontier").

to_prompt_block

to_prompt_block() -> str

Format this worker as a multi-line block for the LLM system prompt.

Includes the name, description, expected payload schema, and default model tier so the LLM knows exactly how to construct valid sub-tasks.

Source code in src/loom/orchestrator/decomposer.py
def to_prompt_block(self) -> str:
    """Format this worker as a multi-line block for the LLM system prompt.

    Includes the name, description, expected payload schema, and default
    model tier so the LLM knows exactly how to construct valid sub-tasks.
    """
    lines = [
        f"  Worker: {self.name}",
        f"    Description: {self.description}",
        f"    Default tier: {self.default_tier}",
    ]
    if self.input_schema:
        schema_str = json.dumps(self.input_schema, indent=2)
        indented = _indent(schema_str, 6)
        lines.append(f"    Input payload schema:\n{indented}")
    return "\n".join(lines)

GoalDecomposer

GoalDecomposer(backend: LLMBackend, workers: list[WorkerDescriptor], *, max_tokens: int = 2000, temperature: float = 0.0)

LLM-based goal decomposition.

Turns a high-level goal string into a list of TaskMessage objects ready for dispatch through the router.

The decomposer asks an LLM to plan which workers to invoke and how to parameterize each one. It then parses the structured JSON response into validated TaskMessage objects.

All parsing and validation failures are handled gracefully -- invalid sub-tasks are logged and skipped rather than crashing the orchestrator. If the entire LLM response is unparseable, an empty list is returned.

Parameters:

Name Type Description Default
backend LLMBackend

An LLM backend instance (OllamaBackend, AnthropicBackend, etc.) used to generate the decomposition plan.

required
workers list[WorkerDescriptor]

List of WorkerDescriptor objects describing the available worker types. These are injected into the system prompt so the LLM knows what tools it can plan around.

required
max_tokens int

Maximum tokens for the LLM response. Should be large enough to accommodate the full JSON plan. Defaults to 2000.

2000
temperature float

Sampling temperature. Low values (0.0--0.2) produce more deterministic plans. Defaults to 0.0 for reproducibility.

0.0

Example::

workers = [
    WorkerDescriptor(
        name="summarizer",
        description="Compresses text to structured summary",
        input_schema={"type": "object", "required": ["text"], ...},
        default_tier="local",
    ),
    WorkerDescriptor(
        name="extractor",
        description="Extracts structured fields from text",
        input_schema={"type": "object", "required": ["text", "fields"], ...},
        default_tier="standard",
    ),
]
decomposer = GoalDecomposer(backend=ollama_backend, workers=workers)
tasks = await decomposer.decompose(
    goal="Summarize this report and extract the key dates",
    context={"text": "...report content..."},
)
# tasks is a list[TaskMessage] ready for dispatch
Source code in src/loom/orchestrator/decomposer.py
def __init__(
    self,
    backend: LLMBackend,
    workers: list[WorkerDescriptor],
    *,
    max_tokens: int = 2000,
    temperature: float = 0.0,
) -> None:
    self._backend = backend
    self._workers = workers
    self._worker_names = {w.name for w in workers}
    self._system_prompt = _build_system_prompt(workers)
    self._max_tokens = max_tokens
    self._temperature = temperature

decompose async

decompose(goal: str, context: dict[str, Any] | None = None, *, parent_task_id: str | None = None, priority: TaskPriority = TaskPriority.NORMAL) -> list[TaskMessage]

Decompose a high-level goal into a list of TaskMessage objects.

Sends the goal and context to the LLM along with descriptions of all available workers. The LLM returns a JSON plan which is parsed and validated into TaskMessage instances.

This method never raises on LLM or parsing failures -- it logs the error and returns an empty list. The orchestrator can then decide whether to retry with different parameters or report failure upstream.

Parameters:

Name Type Description Default
goal str

Natural-language description of what needs to be accomplished.

required
context dict[str, Any] | None

Optional domain-specific data dict (e.g. file references, category lists, full text content). Included verbatim in the LLM prompt so it can construct appropriate payloads.

None
parent_task_id str | None

If this decomposition is part of a larger goal, all generated TaskMessages will carry this as their parent_task_id for result correlation. Typically set to OrchestratorGoal.goal_id.

None
priority TaskPriority

Default priority for generated tasks. Individual tasks may override this if the LLM specifies a different priority.

NORMAL

Returns:

Type Description
list[TaskMessage]

A list of TaskMessage objects ready for dispatch to the router.

list[TaskMessage]

Returns an empty list if:

list[TaskMessage]
  • The LLM backend call fails (network error, timeout, etc.)
list[TaskMessage]
  • The LLM response cannot be parsed as JSON
list[TaskMessage]
  • The LLM returns an empty plan
list[TaskMessage]
  • All sub-tasks fail validation (unknown worker types, etc.)
Source code in src/loom/orchestrator/decomposer.py
async def decompose(
    self,
    goal: str,
    context: dict[str, Any] | None = None,
    *,
    parent_task_id: str | None = None,
    priority: TaskPriority = TaskPriority.NORMAL,
) -> list[TaskMessage]:
    """Decompose a high-level goal into a list of TaskMessage objects.

    Sends the goal and context to the LLM along with descriptions of all
    available workers. The LLM returns a JSON plan which is parsed and
    validated into TaskMessage instances.

    This method never raises on LLM or parsing failures -- it logs the
    error and returns an empty list. The orchestrator can then decide
    whether to retry with different parameters or report failure upstream.

    Args:
        goal: Natural-language description of what needs to be accomplished.
        context: Optional domain-specific data dict (e.g. file references,
            category lists, full text content). Included verbatim in the
            LLM prompt so it can construct appropriate payloads.
        parent_task_id: If this decomposition is part of a larger goal,
            all generated TaskMessages will carry this as their
            ``parent_task_id`` for result correlation. Typically set to
            ``OrchestratorGoal.goal_id``.
        priority: Default priority for generated tasks. Individual tasks
            may override this if the LLM specifies a different priority.

    Returns:
        A list of TaskMessage objects ready for dispatch to the router.
        Returns an empty list if:

        - The LLM backend call fails (network error, timeout, etc.)
        - The LLM response cannot be parsed as JSON
        - The LLM returns an empty plan
        - All sub-tasks fail validation (unknown worker types, etc.)
    """
    log = logger.bind(
        goal_preview=goal[:120],
        parent_task_id=parent_task_id,
    )
    log.info("decomposer.starting", num_workers=len(self._workers))

    user_message = _build_user_message(goal, context)

    # -- Call the LLM backend --
    try:
        response = await self._backend.complete(
            system_prompt=self._system_prompt,
            user_message=user_message,
            max_tokens=self._max_tokens,
            temperature=self._temperature,
        )
    except Exception:
        log.exception("decomposer.llm_call_failed")
        return []

    raw_content = response["content"]
    model_used = response.get("model", "unknown")
    log.debug(
        "decomposer.llm_response",
        model=model_used,
        prompt_tokens=response.get("prompt_tokens"),
        completion_tokens=response.get("completion_tokens"),
        response_length=len(raw_content),
    )

    # -- Parse the JSON response into raw subtask dicts --
    try:
        raw_tasks = _extract_json_array(raw_content)
    except ValueError:
        log.error(
            "decomposer.json_parse_failed",
            raw_preview=raw_content[:300],
        )
        return []

    if not raw_tasks:
        log.info("decomposer.empty_plan", goal_preview=goal[:120])
        return []

    # -- Validate and convert each raw dict into a TaskMessage --
    tasks: list[TaskMessage] = []
    for i, raw_task in enumerate(raw_tasks):
        task = self._parse_subtask(
            raw_task,
            index=i,
            parent_task_id=parent_task_id,
            default_priority=priority,
        )
        if task is not None:
            tasks.append(task)

    log.info(
        "decomposer.completed",
        total_planned=len(raw_tasks),
        total_valid=len(tasks),
        worker_types=[t.worker_type for t in tasks],
    )
    return tasks

from_worker_configs classmethod

from_worker_configs(backend: LLMBackend, configs: list[dict[str, Any]], **kwargs: Any) -> GoalDecomposer

Build WorkerDescriptors from raw worker config dicts.

This avoids the caller having to manually construct WorkerDescriptor objects when the data is already available as parsed YAML configs.

Parameters:

Name Type Description Default
backend LLMBackend

The LLM backend to use for decomposition.

required
configs list[dict[str, Any]]

List of worker config dicts, each containing at minimum name and description keys. Typically loaded from the worker YAML files in configs/workers/.

required
**kwargs Any

Additional keyword arguments forwarded to the GoalDecomposer constructor (e.g. max_tokens, temperature).

{}

Returns:

Type Description
GoalDecomposer

A configured GoalDecomposer instance.

Example::

import yaml

with open("configs/workers/summarizer.yaml") as f:
    summarizer_cfg = yaml.safe_load(f)
with open("configs/workers/classifier.yaml") as f:
    classifier_cfg = yaml.safe_load(f)

decomposer = GoalDecomposer.from_worker_configs(
    backend=ollama_backend,
    configs=[summarizer_cfg, classifier_cfg],
)
Source code in src/loom/orchestrator/decomposer.py
@classmethod
def from_worker_configs(
    cls,
    backend: LLMBackend,
    configs: list[dict[str, Any]],
    **kwargs: Any,
) -> GoalDecomposer:
    """Build WorkerDescriptors from raw worker config dicts.

    This avoids the caller having to manually construct WorkerDescriptor
    objects when the data is already available as parsed YAML configs.

    Args:
        backend: The LLM backend to use for decomposition.
        configs: List of worker config dicts, each containing at minimum
            ``name`` and ``description`` keys. Typically loaded from the
            worker YAML files in ``configs/workers/``.
        **kwargs: Additional keyword arguments forwarded to the
            GoalDecomposer constructor (e.g. ``max_tokens``, ``temperature``).

    Returns:
        A configured GoalDecomposer instance.

    Example::

        import yaml

        with open("configs/workers/summarizer.yaml") as f:
            summarizer_cfg = yaml.safe_load(f)
        with open("configs/workers/classifier.yaml") as f:
            classifier_cfg = yaml.safe_load(f)

        decomposer = GoalDecomposer.from_worker_configs(
            backend=ollama_backend,
            configs=[summarizer_cfg, classifier_cfg],
        )
    """
    workers = [
        WorkerDescriptor(
            name=cfg["name"],
            description=cfg.get("description", "No description provided."),
            input_schema=cfg.get("input_schema", {}),
            default_tier=cfg.get("default_model_tier", "standard"),
        )
        for cfg in configs
    ]
    return cls(backend=backend, workers=workers, **kwargs)

Synthesizer

synthesizer

Result aggregation for orchestrators.

Responsible for combining results from multiple workers into a coherent final output.

This module is used by OrchestratorActor (runner.py), NOT by PipelineOrchestrator (which simply collects stage outputs into a dict).

Two modes of operation:

1. **Simple merge** (no LLM backend required)
   Partitions results into succeeded/failed, aggregates outputs into a
   structured dict with metadata.  Fast, deterministic, zero cost.

2. **LLM synthesis** (requires an LLM backend + a goal string)
   Sends the collected worker outputs to an LLM with instructions to
   produce a coherent narrative synthesis.  Use this when the orchestrator
   needs to present a unified answer to the user rather than a bag of
   sub-results.
Design decisions
  • Partial failures are first-class: every output dict contains both succeeded and failed sections so callers never lose visibility into what went wrong.
  • The LLM synthesis prompt is kept internal to this module; callers only pass the goal string and the list of TaskResults.
  • Token-budget awareness: if the combined result text is very large, the synthesizer truncates individual outputs before sending them to the LLM to avoid blowing the context window.

ResultSynthesizer

ResultSynthesizer(backend: LLMBackend | None = None, max_output_chars: int = _MAX_OUTPUT_CHARS)

Combines multiple worker :class:TaskResult objects into a final output.

The synthesizer operates in one of two modes depending on how it is constructed and invoked:

Simple merge (default, no LLM): Call :meth:merge or call :meth:synthesize without a goal. Returns a structured dict with succeeded and failed sections plus aggregate metadata.

LLM synthesis (requires backend and a goal): Call :meth:synthesize with a goal string. The LLM receives the original goal, all worker outputs, and instructions to produce a unified answer.

Parameters

backend : LLMBackend | None An optional LLM backend (e.g. :class:OllamaBackend, :class:AnthropicBackend). When provided and a goal is passed to :meth:synthesize, the synthesizer will use the LLM to produce a coherent narrative. When None, only deterministic merge is available. max_output_chars : int Per-result character budget when building the LLM prompt. Outputs longer than this are truncated to avoid exceeding the model's context window. Defaults to :data:_MAX_OUTPUT_CHARS.

Example:

::

# Simple merge (no LLM)
synth = ResultSynthesizer()
merged = synth.merge(results)

# LLM synthesis
synth = ResultSynthesizer(backend=my_ollama_backend)
combined = await synth.synthesize(results, goal="Summarise the document")
Source code in src/loom/orchestrator/synthesizer.py
def __init__(
    self,
    backend: LLMBackend | None = None,
    max_output_chars: int = _MAX_OUTPUT_CHARS,
) -> None:
    self._backend = backend
    self._max_output_chars = max_output_chars

merge

merge(results: list[TaskResult]) -> dict[str, Any]

Deterministic merge of task results — no LLM involved.

Partitions results into succeeded and failed groups, extracts their outputs (or errors), and returns a structured dict with aggregate metadata.

Parameters

results : list[TaskResult] Worker results to merge. May be empty.

Returns:

dict[str, Any] A dict with the following top-level keys:

- ``succeeded`` — list of dicts, each containing ``task_id``,
  ``worker_type``, ``output``, ``model_used``, and
  ``processing_time_ms`` for every completed result.
- ``failed`` — list of dicts, each containing ``task_id``,
  ``worker_type``, ``error``, and ``processing_time_ms`` for every
  failed result.
- ``metadata`` — aggregate statistics: ``total``, ``succeeded``,
  ``failed``, ``total_processing_time_ms``, ``models_used``, and
  ``total_tokens``.
Source code in src/loom/orchestrator/synthesizer.py
def merge(self, results: list[TaskResult]) -> dict[str, Any]:
    """Deterministic merge of task results — no LLM involved.

    Partitions *results* into succeeded and failed groups, extracts their
    outputs (or errors), and returns a structured dict with aggregate
    metadata.

    Parameters
    ----------
    results : list[TaskResult]
        Worker results to merge.  May be empty.

    Returns:
    -------
    dict[str, Any]
        A dict with the following top-level keys:

        - ``succeeded`` — list of dicts, each containing ``task_id``,
          ``worker_type``, ``output``, ``model_used``, and
          ``processing_time_ms`` for every completed result.
        - ``failed`` — list of dicts, each containing ``task_id``,
          ``worker_type``, ``error``, and ``processing_time_ms`` for every
          failed result.
        - ``metadata`` — aggregate statistics: ``total``, ``succeeded``,
          ``failed``, ``total_processing_time_ms``, ``models_used``, and
          ``total_tokens``.
    """
    succeeded, failed = self._partition(results)

    succeeded_entries = [
        {
            "task_id": r.task_id,
            "worker_type": r.worker_type,
            "output": r.output,
            "model_used": r.model_used,
            "processing_time_ms": r.processing_time_ms,
        }
        for r in succeeded
    ]

    failed_entries = [
        {
            "task_id": r.task_id,
            "worker_type": r.worker_type,
            "error": r.error,
            "processing_time_ms": r.processing_time_ms,
        }
        for r in failed
    ]

    # Aggregate token usage across all results (succeeded or not).
    total_tokens: dict[str, int] = {}
    for r in results:
        for key, value in r.token_usage.items():
            total_tokens[key] = total_tokens.get(key, 0) + value

    # Collect distinct model identifiers (filter out None).
    models_used = sorted({r.model_used for r in results if r.model_used is not None})

    metadata = {
        "total": len(results),
        "succeeded": len(succeeded),
        "failed": len(failed),
        "total_processing_time_ms": sum(r.processing_time_ms for r in results),
        "models_used": models_used,
        "total_tokens": total_tokens,
    }

    if failed:
        logger.warning(
            "synthesizer.merge_partial_failure",
            total=len(results),
            failed=len(failed),
            failed_workers=[r.worker_type for r in failed],
        )
    else:
        logger.info(
            "synthesizer.merge_complete",
            total=len(results),
        )

    return {
        "succeeded": succeeded_entries,
        "failed": failed_entries,
        "metadata": metadata,
    }

synthesize async

synthesize(results: list[TaskResult], goal: str | None = None) -> dict[str, Any]

Combine worker results into a final coherent output.

If an LLM backend was provided at construction time and a goal string is supplied, the method delegates to :meth:_llm_synthesize which asks the LLM to produce a unified narrative. Otherwise it falls back to :meth:merge.

Parameters

results : list[TaskResult] Worker results to synthesize. May be empty (in which case the output will indicate that no results were available). goal : str | None The original high-level goal that spawned these tasks. Required for LLM synthesis mode; ignored in merge mode.

Returns:

dict[str, Any] In merge mode the return value is identical to :meth:merge.

In **LLM mode** the dict contains:

- ``synthesis`` — the LLM's coherent combined answer (str).
- ``confidence`` — ``"high"``, ``"medium"``, or ``"low"`` (str).
- ``conflicts`` — list of contradictions the LLM identified.
- ``gaps`` — list of missing information from failed tasks.
- ``succeeded`` / ``failed`` / ``metadata`` — same as merge mode.
- ``llm_metadata`` — model used and token counts for the synthesis
  call itself.
Source code in src/loom/orchestrator/synthesizer.py
async def synthesize(
    self,
    results: list[TaskResult],
    goal: str | None = None,
) -> dict[str, Any]:
    """Combine worker results into a final coherent output.

    If an LLM backend was provided at construction time **and** a *goal*
    string is supplied, the method delegates to :meth:`_llm_synthesize`
    which asks the LLM to produce a unified narrative.  Otherwise it falls
    back to :meth:`merge`.

    Parameters
    ----------
    results : list[TaskResult]
        Worker results to synthesize.  May be empty (in which case the
        output will indicate that no results were available).
    goal : str | None
        The original high-level goal that spawned these tasks.  Required
        for LLM synthesis mode; ignored in merge mode.

    Returns:
    -------
    dict[str, Any]
        In **merge mode** the return value is identical to :meth:`merge`.

        In **LLM mode** the dict contains:

        - ``synthesis`` — the LLM's coherent combined answer (str).
        - ``confidence`` — ``"high"``, ``"medium"``, or ``"low"`` (str).
        - ``conflicts`` — list of contradictions the LLM identified.
        - ``gaps`` — list of missing information from failed tasks.
        - ``succeeded`` / ``failed`` / ``metadata`` — same as merge mode.
        - ``llm_metadata`` — model used and token counts for the synthesis
          call itself.
    """
    # Fast path: no results at all.
    if not results:
        logger.warning("synthesizer.no_results")
        return self.merge(results)

    # Decide mode.
    use_llm = self._backend is not None and goal is not None
    if not use_llm:
        return self.merge(results)

    return await self._llm_synthesize(results, goal)  # type: ignore[arg-type]

Checkpoint

checkpoint

Self-summarization checkpoint system for orchestrators.

The orchestrator's context is precious. This module compresses conversation history into structured state snapshots at defined intervals, allowing the orchestrator to "reboot" with a clean, compact understanding of where things stand.

Checkpoint trigger: when estimated token count exceeds threshold.

Storage: Pluggable via CheckpointStore (see orchestrator/store.py). Keys follow the pattern::

loom:checkpoint:{goal_id}:{checkpoint_number}  — versioned checkpoint
loom:checkpoint:{goal_id}:latest                — pointer to most recent

The orchestrator workflow with checkpoints::

1. Process goal, accumulate conversation_history
2. After each worker result: should_checkpoint(conversation_history)
3. If True: create_checkpoint() → compress state → persist to store
4. Orchestrator "reboots" with: system_prompt + format_for_injection(checkpoint)
   + last N interactions (recent_window_size)

This is conceptually similar to how Claude Code itself handles context compression — the key insight is the same: keep a structured summary + recent window rather than the full history.

This module is used by OrchestratorActor (runner.py).

PipelineOrchestrator does NOT use checkpoints because its sequential stage execution doesn't accumulate unbounded context.

Note: Token counting uses tiktoken with cl100k_base encoding (OpenAI's tokenizer). For Anthropic models, token counts are approximate (~10-15% estimation error). This is acceptable for checkpoint threshold decisions where exact counts are not critical.

CheckpointManager

CheckpointManager(store: CheckpointStore, token_threshold: int = 50000, recent_window_size: int = 5, encoding_name: str = 'cl100k_base', ttl_seconds: int = 86400)

Manages orchestrator state compression.

Workflow:

  1. After each worker result, estimate_tokens() checks context size
  2. If threshold exceeded, create_checkpoint() asks a summarizer to compress the current state
  3. The orchestrator restarts with: system_prompt + checkpoint + recent_window
Source code in src/loom/orchestrator/checkpoint.py
def __init__(
    self,
    store: CheckpointStore,
    token_threshold: int = 50_000,  # Trigger checkpoint at this count
    recent_window_size: int = 5,  # Keep last N interactions in detail
    encoding_name: str = "cl100k_base",
    ttl_seconds: int = 86400,  # Key expiry (default: 24h)
) -> None:
    self.store = store
    self.token_threshold = token_threshold
    self.recent_window_size = recent_window_size
    self.encoder = tiktoken.get_encoding(encoding_name)
    self.ttl_seconds = ttl_seconds

estimate_tokens

estimate_tokens(text: str) -> int

Estimate token count for a string.

Source code in src/loom/orchestrator/checkpoint.py
def estimate_tokens(self, text: str) -> int:
    """Estimate token count for a string."""
    return len(self.encoder.encode(text))

should_checkpoint

should_checkpoint(conversation_history: list[dict]) -> bool

Check if context has grown enough to trigger compression.

Source code in src/loom/orchestrator/checkpoint.py
def should_checkpoint(self, conversation_history: list[dict]) -> bool:
    """Check if context has grown enough to trigger compression."""
    total = sum(self.estimate_tokens(json.dumps(msg)) for msg in conversation_history)
    return total > self.token_threshold

create_checkpoint async

create_checkpoint(goal_id: str, original_instruction: str, completed_tasks: list[dict[str, Any]], pending_tasks: list[dict[str, Any]], open_issues: list[str], decisions_made: list[str], checkpoint_number: int) -> CheckpointState

Build a checkpoint.

The orchestrator or a dedicated summarizer compresses current state into this structure.

Source code in src/loom/orchestrator/checkpoint.py
async def create_checkpoint(
    self,
    goal_id: str,
    original_instruction: str,
    completed_tasks: list[dict[str, Any]],
    pending_tasks: list[dict[str, Any]],
    open_issues: list[str],
    decisions_made: list[str],
    checkpoint_number: int,
) -> CheckpointState:
    """Build a checkpoint.

    The orchestrator or a dedicated summarizer compresses current state
    into this structure.
    """
    # Build executive summary from completed task outcomes.
    # Only the last 20 tasks are included to keep the summary concise.
    # Of those, only the last 10 are rendered into the summary text.
    outcomes = []
    for t in completed_tasks[-20:]:
        status = t.get("status", "unknown")
        summary = t.get("summary", t.get("worker_type", "task"))
        outcomes.append(f"- [{status}] {summary}")

    executive_summary = (
        f"Goal: {original_instruction}\n"
        f"Progress: {len(completed_tasks)} completed, {len(pending_tasks)} pending.\n"
        f"Recent outcomes:\n" + "\n".join(outcomes[-10:])
    )

    total_tokens = self.estimate_tokens(executive_summary)

    checkpoint = CheckpointState(
        goal_id=goal_id,
        original_instruction=original_instruction,
        executive_summary=executive_summary,
        completed_tasks=[
            {
                "task_id": t["task_id"],
                "worker_type": t.get("worker_type"),
                "summary": t.get("summary", ""),
            }
            for t in completed_tasks
        ],
        pending_tasks=pending_tasks,
        open_issues=open_issues,
        decisions_made=decisions_made,
        context_token_count=total_tokens,
        checkpoint_number=checkpoint_number,
    )

    # Persist to store with configurable TTL (default 24h).
    # Long-running goals can increase ttl_seconds at construction time.
    key = f"loom:checkpoint:{goal_id}:{checkpoint_number}"
    await self.store.set(key, checkpoint.model_dump_json(), self.ttl_seconds)

    # Maintain a "latest" pointer so load_latest() doesn't need to scan.
    await self.store.set(f"loom:checkpoint:{goal_id}:latest", key, self.ttl_seconds)

    logger.info(
        "checkpoint.created",
        goal_id=goal_id,
        checkpoint_number=checkpoint_number,
        token_count=total_tokens,
    )
    return checkpoint

load_latest async

load_latest(goal_id: str) -> CheckpointState | None

Load the most recent checkpoint for a goal.

Source code in src/loom/orchestrator/checkpoint.py
async def load_latest(self, goal_id: str) -> CheckpointState | None:
    """Load the most recent checkpoint for a goal."""
    latest_key = await self.store.get(f"loom:checkpoint:{goal_id}:latest")
    if not latest_key:
        return None
    data = await self.store.get(latest_key)
    if not data:
        return None
    return CheckpointState.model_validate_json(data)

format_for_injection

format_for_injection(checkpoint: CheckpointState) -> str

Format checkpoint as context to inject into a fresh orchestrator session.

This is what the orchestrator sees when it "wakes up" after a checkpoint.

Source code in src/loom/orchestrator/checkpoint.py
def format_for_injection(self, checkpoint: CheckpointState) -> str:
    """Format checkpoint as context to inject into a fresh orchestrator session.

    This is what the orchestrator sees when it "wakes up" after a checkpoint.
    """
    sections = [
        f"=== CHECKPOINT #{checkpoint.checkpoint_number} ===",
        f"Original Goal: {checkpoint.original_instruction}",
        "",
        "--- Executive Summary ---",
        checkpoint.executive_summary,
        "",
        f"--- Decisions Made ({len(checkpoint.decisions_made)}) ---",
    ]
    sections.extend(f"  * {d}" for d in checkpoint.decisions_made)

    if checkpoint.open_issues:
        sections.append(f"\n--- Open Issues ({len(checkpoint.open_issues)}) ---")
        sections.extend(f"  ! {issue}" for issue in checkpoint.open_issues)

    sections.append(f"\n--- Pending Tasks ({len(checkpoint.pending_tasks)}) ---")
    sections.extend(f"  -> {t}" for t in checkpoint.pending_tasks)

    sections.append("\n=== END CHECKPOINT ===")
    return "\n".join(sections)

Store

store

Checkpoint storage abstraction.

Defines the CheckpointStore ABC and an in-memory implementation for testing. Production deployments use RedisCheckpointStore from loom.contrib.redis.store.

Storage contract

set(key, value, ttl_seconds) — persist a string value with optional expiry get(key) — retrieve a string value (or None if missing/expired)

This is intentionally minimal. The CheckpointManager handles serialization and key naming; the store is just a key-value backend.

CheckpointStore

Bases: ABC

Abstract key-value store for checkpoint persistence.

Implementations must handle: - String key-value storage - TTL-based expiration (best-effort; lazy expiry is acceptable) - Returning None for missing or expired keys

set abstractmethod async

set(key: str, value: str, ttl_seconds: int | None = None) -> None

Store a value with optional TTL.

Source code in src/loom/orchestrator/store.py
@abstractmethod
async def set(self, key: str, value: str, ttl_seconds: int | None = None) -> None:
    """Store a value with optional TTL."""
    ...

get abstractmethod async

get(key: str) -> str | None

Retrieve a value, or None if missing/expired.

Source code in src/loom/orchestrator/store.py
@abstractmethod
async def get(self, key: str) -> str | None:
    """Retrieve a value, or None if missing/expired."""
    ...

InMemoryCheckpointStore

InMemoryCheckpointStore()

Bases: CheckpointStore

In-memory checkpoint store for testing and local development.

Values are stored in a dict with optional expiry timestamps. Expiry is checked lazily on get() — no background cleanup.

Source code in src/loom/orchestrator/store.py
def __init__(self) -> None:
    # Maps key -> (value, expires_at | None)
    self._data: dict[str, tuple[str, float | None]] = {}

set async

set(key: str, value: str, ttl_seconds: int | None = None) -> None

Store a value with optional TTL.

Source code in src/loom/orchestrator/store.py
async def set(self, key: str, value: str, ttl_seconds: int | None = None) -> None:
    """Store a value with optional TTL."""
    expires_at = time.monotonic() + ttl_seconds if ttl_seconds else None
    self._data[key] = (value, expires_at)

get async

get(key: str) -> str | None

Retrieve a value, or None if missing/expired.

Source code in src/loom/orchestrator/store.py
async def get(self, key: str) -> str | None:
    """Retrieve a value, or None if missing/expired."""
    entry = self._data.get(key)
    if entry is None:
        return None
    value, expires_at = entry
    if expires_at is not None and time.monotonic() > expires_at:
        del self._data[key]
        return None
    return value

Result Stream

stream

Streaming result collection for orchestrators.

Provides ResultStream, an async iterator that yields TaskResult objects as they arrive from the message bus — rather than blocking until all results are collected.

Two consumption modes:

1. **Batch** (backward compatible with pre-Strategy-A code)::

       stream = ResultStream(bus, subject, expected_ids, timeout)
       results = await stream.collect_all()

2. **Incremental** (new — enables progress callbacks and early exit)::

       stream = ResultStream(bus, subject, expected_ids, timeout,
                             on_result=my_progress_callback)
       async for result in stream:
           # process each result as it arrives
           ...

The on_result callback is invoked for every arriving result with the signature (result, collected_count, expected_count) -> bool | None. Returning True signals early exit — the stream stops collecting and the caller gets whatever has arrived so far.

This module is used by:

  • OrchestratorActor._collect_results() — dynamic orchestrator
  • Potentially by MCPBridge for richer progress reporting (future)

Design decisions:

  • Single-use: a ResultStream can only be iterated once (it owns the bus subscription lifecycle).
  • Callback errors are non-fatal: if on_result raises, the error is logged and collection continues.
  • Duplicate filtering: results for the same task_id are silently skipped (at-least-once delivery tolerance).
  • Unknown task_ids are ignored: only results matching expected_task_ids are collected.

ResultCallback

Bases: Protocol

Callback invoked when a result arrives during streaming collection.

Parameters

result : TaskResult The just-arrived result. collected : int How many results have been collected so far (including this one). expected : int Total number of expected results.

Returns:

bool | None Return True to signal early exit (stop collecting). Return None or False to continue.

ResultStream

ResultStream(bus: MessageBus, subject: str, expected_task_ids: set[str], timeout: float, *, on_result: ResultCallback | None = None)

Async iterator that yields TaskResult objects as they arrive.

Wraps a bus subscription for a specific result subject, filtering incoming messages to only those matching expected_task_ids.

The stream terminates when:

  • All expected results have arrived, OR
  • The timeout expires, OR
  • The on_result callback returns True (early exit), OR
  • The subscription is closed.

After iteration, inspect :attr:collected, :attr:timed_out, and :attr:early_exited for post-mortem state.

Parameters

bus : MessageBus The message bus to subscribe on. subject : str NATS subject to subscribe to (e.g. loom.results.{goal_id}). expected_task_ids : set[str] Set of task_ids we expect results for. timeout : float Maximum seconds to wait for all results. on_result : ResultCallback | None Optional callback invoked as each result arrives.

Example:

::

stream = ResultStream(
    bus=nats_bus,
    subject=f"loom.results.{goal_id}",
    expected_task_ids={"task-1", "task-2", "task-3"},
    timeout=60.0,
    on_result=my_progress_handler,
)

# Batch mode (drop-in replacement for old collect):
results = await stream.collect_all()

# Or streaming mode:
async for result in stream:
    print(f"Got {result.worker_type}: {result.status}")
Source code in src/loom/orchestrator/stream.py
def __init__(
    self,
    bus: MessageBus,
    subject: str,
    expected_task_ids: set[str],
    timeout: float,
    *,
    on_result: ResultCallback | None = None,
) -> None:
    self._bus = bus
    self._subject = subject
    self._expected_ids = frozenset(expected_task_ids)
    self._timeout = timeout
    self._on_result = on_result

    # Mutable state — populated during iteration.
    self._collected: dict[str, TaskResult] = {}
    self._timed_out: bool = False
    self._early_exited: bool = False
    self._consumed: bool = False

collected property

collected: dict[str, TaskResult]

Map of task_id → TaskResult for all collected results.

expected_count property

expected_count: int

Number of results we expect.

collected_count property

collected_count: int

Number of results collected so far.

all_collected property

all_collected: bool

True when every expected result has arrived.

timed_out property

timed_out: bool

True if collection ended due to timeout.

early_exited property

early_exited: bool

True if collection ended due to on_result callback signaling stop.

pending_ids property

pending_ids: frozenset[str]

Task IDs that were expected but never arrived.

collect_all async

collect_all() -> list[TaskResult]

Consume the stream fully, returning all collected results as a list.

This is the backward-compatible entry point — it behaves identically to the pre-Strategy-A _collect_results() method.

Source code in src/loom/orchestrator/stream.py
async def collect_all(self) -> list[TaskResult]:
    """Consume the stream fully, returning all collected results as a list.

    This is the backward-compatible entry point — it behaves identically
    to the pre-Strategy-A ``_collect_results()`` method.
    """
    return [result async for result in self]

__aiter__

__aiter__() -> AsyncIterator[TaskResult]

Return the async iterator (self — delegates to _stream).

Source code in src/loom/orchestrator/stream.py
def __aiter__(self) -> AsyncIterator[TaskResult]:
    """Return the async iterator (self — delegates to _stream)."""
    if self._consumed:
        raise RuntimeError(
            "ResultStream has already been consumed. "
            "Create a new ResultStream for another iteration."
        )
    self._consumed = True
    return self._stream()