Skip to content

Core

The loom.core package contains the foundational abstractions that all Loom components build on: actors, messages, configuration, I/O contracts, app manifests, and workspace file management.

You rarely use these directly — they're used by workers, orchestrators, and the CLI. See Building Workflows for the user-facing guide.

Actor

The base class for all NATS-connected actors (workers, routers, orchestrators).

actor

Base actor class — the foundation of Loom's actor model.

All Loom actors (workers, orchestrators, routers) inherit from BaseActor. This class handles the message bus subscription lifecycle, message dispatch, signal-based shutdown, and error isolation. Each actor is an independent process with no shared memory.

Design invariant: actors communicate ONLY through bus messages (see messages.py). Direct method calls between actors are forbidden.

The message bus is pluggable via the bus constructor parameter. The default is NATSBus (created from nats_url when no bus is provided). For testing, pass an InMemoryBus instead.

BaseActor

BaseActor(actor_id: str, nats_url: str = 'nats://nats:4222', max_concurrent: int = 1, *, bus: MessageBus | None = None)

Bases: ABC

Actor model base class.

Each actor: - Subscribes to a message bus subject - Processes messages with configurable concurrency (default: 1 = strict ordering) - Communicates only through structured messages - Has isolated state (no shared memory) - Shuts down gracefully on SIGTERM/SIGINT

Concurrency can be configured via max_concurrent. Values > 1 allow parallel message processing within a single actor instance — use with care, as it relaxes ordering guarantees. Horizontal scaling via queue groups (multiple replicas) is the preferred way to increase throughput while preserving per-message isolation.

The message bus can be injected via the bus keyword argument. If omitted, a NATSBus is created from nats_url (backward-compatible default).

Source code in src/loom/core/actor.py
def __init__(
    self,
    actor_id: str,
    nats_url: str = "nats://nats:4222",
    max_concurrent: int = 1,
    *,
    bus: MessageBus | None = None,
) -> None:
    self.actor_id = actor_id
    self.max_concurrent = max_concurrent
    self._sub: Subscription | None = None
    self._control_sub: Subscription | None = None
    self._running = False
    self._shutdown_event: asyncio.Event | None = None
    # Semaphore is created at run() time inside the event loop
    self._semaphore: asyncio.Semaphore | None = None
    self._background_tasks: set[asyncio.Task[None]] = set()

    if bus is not None:
        self._bus = bus
    else:
        from loom.bus.nats_adapter import NATSBus

        self._bus = NATSBus(nats_url)

connect async

connect() -> None

Connect to the message bus.

Source code in src/loom/core/actor.py
async def connect(self) -> None:
    """Connect to the message bus."""
    await self._bus.connect()
    logger.info("actor.connected", actor_id=self.actor_id)

disconnect async

disconnect() -> None

Unsubscribe and close the message bus connection.

Source code in src/loom/core/actor.py
async def disconnect(self) -> None:
    """Unsubscribe and close the message bus connection."""
    if self._control_sub:
        await self._control_sub.unsubscribe()
    if self._sub:
        await self._sub.unsubscribe()
    await self._bus.close()
    logger.info("actor.disconnected", actor_id=self.actor_id)

subscribe async

subscribe(subject: str, queue_group: str | None = None) -> None

Subscribe to a bus subject.

Queue group enables competing consumers (multiple worker replicas share load).

Source code in src/loom/core/actor.py
async def subscribe(self, subject: str, queue_group: str | None = None) -> None:
    """Subscribe to a bus subject.

    Queue group enables competing consumers
    (multiple worker replicas share load).
    """
    self._sub = await self._bus.subscribe(subject, queue_group)
    logger.info("actor.subscribed", actor_id=self.actor_id, subject=subject)

publish async

publish(subject: str, message: dict[str, Any]) -> None

Publish a message to the given bus subject.

Source code in src/loom/core/actor.py
async def publish(self, subject: str, message: dict[str, Any]) -> None:
    """Publish a message to the given bus subject."""
    await self._bus.publish(subject, message)

on_reload async

on_reload() -> None

Config reload hook — called when a control reload message arrives.

Subclasses override this to re-read their config from disk. The default implementation is a no-op.

Source code in src/loom/core/actor.py
async def on_reload(self) -> None:  # noqa: B027
    """Config reload hook — called when a control reload message arrives.

    Subclasses override this to re-read their config from disk.
    The default implementation is a no-op.
    """

run async

run(subject: str, queue_group: str | None = None) -> None

Main actor loop — subscribe, process messages, and handle shutdown.

This method blocks until a shutdown signal (SIGTERM/SIGINT) is received or the bus connection drops. Messages are processed with bounded concurrency controlled by max_concurrent (default 1 = strict ordering).

A background control listener subscribes to loom.control.reload to support hot-reloading of actor configs without restart.

Graceful shutdown sequence: 1. Signal received -> _request_shutdown() sets the shutdown event 2. Message loop breaks after finishing in-flight messages 3. Control listener is cancelled 4. Actor disconnects from the bus (drains pending publishes)

Source code in src/loom/core/actor.py
async def run(self, subject: str, queue_group: str | None = None) -> None:
    """Main actor loop — subscribe, process messages, and handle shutdown.

    This method blocks until a shutdown signal (SIGTERM/SIGINT) is received
    or the bus connection drops. Messages are processed with bounded
    concurrency controlled by max_concurrent (default 1 = strict ordering).

    A background control listener subscribes to ``loom.control.reload``
    to support hot-reloading of actor configs without restart.

    Graceful shutdown sequence:
    1. Signal received -> _request_shutdown() sets the shutdown event
    2. Message loop breaks after finishing in-flight messages
    3. Control listener is cancelled
    4. Actor disconnects from the bus (drains pending publishes)
    """
    self._shutdown_event = asyncio.Event()
    self._semaphore = asyncio.Semaphore(self.max_concurrent)

    await self.connect()
    await self.subscribe(subject, queue_group)
    self._running = True
    self._install_signal_handlers()

    # Start the control listener as a background task.
    control_task = asyncio.create_task(self._run_control_listener())

    logger.info(
        "actor.running",
        actor_id=self.actor_id,
        subject=subject,
        max_concurrent=self.max_concurrent,
    )

    try:
        async for data in self._sub:
            if not self._running:
                break
            if self.max_concurrent == 1:
                # Sequential processing — strict mailbox semantics
                await self._process_one(data)
            else:
                # Concurrent processing — fire-and-forget within semaphore bound
                task = asyncio.create_task(self._process_one(data))
                self._background_tasks.add(task)
                task.add_done_callback(self._background_tasks.discard)
    except asyncio.CancelledError:
        pass  # Clean shutdown via task cancellation
    finally:
        self._running = False
        control_task.cancel()
        await self.disconnect()

handle_message abstractmethod async

handle_message(data: dict[str, Any]) -> None

Process a single message. Subclasses implement this.

Source code in src/loom/core/actor.py
@abstractmethod
async def handle_message(self, data: dict[str, Any]) -> None:
    """Process a single message. Subclasses implement this."""
    ...

Messages

Typed Pydantic models for all inter-actor communication: TaskMessage, TaskResult, OrchestratorGoal, CheckpointState, ModelTier, TaskStatus.

messages

Loom message schemas — the canonical wire format.

All inter-actor communication is typed through these Pydantic models. Actors ONLY communicate through these message types; raw dicts or ad-hoc JSON are forbidden. This enforces a contract-driven architecture where every message is validatable at compile time.

Message flow

Client/CLI ──OrchestratorGoal──> Orchestrator Orchestrator ──TaskMessage──> Router ──TaskMessage──> Worker Worker ──TaskResult──> Orchestrator

See Also

loom.core.contracts — JSON Schema validation for payload/output dicts loom.bus.nats_adapter — NATS subject conventions for message routing

TaskPriority

Bases: StrEnum

Priority levels for task scheduling (not yet enforced by router).

TaskStatus

Bases: StrEnum

Lifecycle states for a task.

State transitions

PENDING -> PROCESSING -> COMPLETED -> FAILED -> RETRY -> PROCESSING (not yet implemented)

ModelTier

Bases: StrEnum

Which model tier should handle this task.

Tiers map to backend instances configured at startup

LOCAL -> OllamaBackend (e.g., llama3.2:3b) STANDARD -> AnthropicBackend (e.g., Claude Sonnet) FRONTIER -> AnthropicBackend (e.g., Claude Opus)

The router may override the tier via tier_overrides in router_rules.yaml.

TaskMessage

Bases: BaseModel

Message sent TO a worker actor.

The payload dict must conform to the worker's input_schema (JSON Schema). Contract validation happens in TaskWorker.handle_message(), not here.

TaskResult

Bases: BaseModel

Message sent FROM a worker actor after processing.

Published to: loom.results.{parent_task_id or 'default'} The output dict must conform to the worker's output_schema (JSON Schema).

OrchestratorGoal

Bases: BaseModel

Top-level goal submitted to an orchestrator.

Published to: loom.goals.incoming The orchestrator (PipelineOrchestrator or OrchestratorActor) picks this up, decomposes it into TaskMessages, and synthesizes results.

The context dict carries domain-specific data (e.g., file_ref for doc processing).

CheckpointState

Bases: BaseModel

Compressed orchestrator state for self-summarization.

When the orchestrator's conversation history exceeds a token threshold, CheckpointManager compresses it into this structure and persists to Valkey. The orchestrator can then "reboot" with a fresh context containing only the checkpoint + a small recent-interactions window.

See: loom.orchestrator.checkpoint.CheckpointManager

Configuration

Config loading, validation, and schema-ref resolution. Validates worker, pipeline, orchestrator, and router YAML configs.

config

Configuration loading and validation utilities.

All Loom configs are YAML files. This module validates them at load time so misconfigurations fail fast at startup — not at first-message time.

Validation functions return a list of error strings (empty = valid). They do NOT raise; callers decide how to handle errors (log, abort, collect).

Four config families are validated here:

  • Worker configs — system prompt, I/O schemas, tier, timeout
  • Pipeline configs — stage names, worker_types, input_mapping, conditions
  • Orchestrator configs — name, system_prompt, checkpoint settings
  • Router rules — tier_overrides, rate_limits

Scheduler and MCP configs have dedicated validators in their own modules.

See Also

configs/workers/_template.yaml — canonical worker config reference loom.scheduler.config — scheduler config validation loom.mcp.config — MCP gateway config validation

ConfigValidationError

Bases: Exception

Raised when a config file fails structural validation.

load_config

load_config(path: str | Path, *, resolve_refs: bool = True) -> dict[str, Any]

Load a YAML config file and return as a dict.

When resolve_refs is True (the default), input_schema_ref and output_schema_ref fields are resolved to JSON Schema via Pydantic model imports. Pass resolve_refs=False when the referenced modules may not be importable (e.g., during config-only validation).

Raises:

Type Description
FileNotFoundError

If the config file doesn't exist.

YAMLError

If the file contains invalid YAML.

ConfigValidationError

If the file parses to a non-dict (e.g., a list or scalar).

Source code in src/loom/core/config.py
def load_config(path: str | Path, *, resolve_refs: bool = True) -> dict[str, Any]:
    """Load a YAML config file and return as a dict.

    When *resolve_refs* is True (the default), ``input_schema_ref`` and
    ``output_schema_ref`` fields are resolved to JSON Schema via Pydantic
    model imports.  Pass ``resolve_refs=False`` when the referenced modules
    may not be importable (e.g., during config-only validation).

    Raises:
        FileNotFoundError: If the config file doesn't exist.
        yaml.YAMLError: If the file contains invalid YAML.
        ConfigValidationError: If the file parses to a non-dict (e.g., a list or scalar).
    """
    with open(path) as f:
        data = yaml.safe_load(f)
    if not isinstance(data, dict):
        raise ConfigValidationError(
            f"Config at {path}: expected YAML mapping, got {type(data).__name__}"
        )
    if resolve_refs:
        resolve_schema_refs(data)
    return data

resolve_schema_refs

resolve_schema_refs(config: dict[str, Any]) -> dict[str, Any]

Resolve input_schema_ref / output_schema_ref to JSON Schema.

If a config dict contains input_schema_ref or output_schema_ref (a dotted Python path to a Pydantic model), import the model and call .model_json_schema() to populate the corresponding input_schema / output_schema key.

An explicit input_schema / output_schema key always takes precedence — the ref is only used when the inline schema is absent.

This function mutates and returns config for convenience.

Raises:

Type Description
ConfigValidationError

If the referenced path cannot be imported or the target is not a Pydantic BaseModel subclass.

Source code in src/loom/core/config.py
def resolve_schema_refs(config: dict[str, Any]) -> dict[str, Any]:
    """Resolve ``input_schema_ref`` / ``output_schema_ref`` to JSON Schema.

    If a config dict contains ``input_schema_ref`` or ``output_schema_ref``
    (a dotted Python path to a Pydantic model), import the model and call
    ``.model_json_schema()`` to populate the corresponding ``input_schema``
    / ``output_schema`` key.

    An explicit ``input_schema`` / ``output_schema`` key always takes
    precedence — the ref is only used when the inline schema is absent.

    This function mutates and returns *config* for convenience.

    Raises:
        ConfigValidationError: If the referenced path cannot be imported
            or the target is not a Pydantic BaseModel subclass.
    """
    for schema_key, ref_key in (
        ("input_schema", "input_schema_ref"),
        ("output_schema", "output_schema_ref"),
    ):
        ref = config.get(ref_key)
        if ref is None:
            continue
        if schema_key in config:
            # Explicit inline schema takes priority — skip resolution.
            continue

        config[schema_key] = _import_pydantic_schema(ref, ref_key)

    # Also resolve schema_refs inside pipeline_stages.
    for stage in config.get("pipeline_stages", []):
        if isinstance(stage, dict):
            for schema_key, ref_key in (
                ("input_schema", "input_schema_ref"),
                ("output_schema", "output_schema_ref"),
            ):
                ref = stage.get(ref_key)
                if ref is None:
                    continue
                if schema_key in stage:
                    continue
                stage[schema_key] = _import_pydantic_schema(ref, ref_key)

    return config

validate_worker_config

validate_worker_config(config: dict[str, Any], path: str | Path = '<unknown>') -> list[str]

Validate a worker config dict.

Checks for:

  • Required keys (name; system_prompt for LLM workers, processing_backend for processor workers)
  • Correct types for all known keys
  • Valid model tier values
  • Valid JSON Schema structure for input_schema/output_schema
  • Knowledge silos structural integrity
  • Numeric bounds (timeouts, token limits)
Source code in src/loom/core/config.py
def validate_worker_config(  # noqa: PLR0912
    config: dict[str, Any], path: str | Path = "<unknown>"
) -> list[str]:
    """Validate a worker config dict.

    Checks for:

    - Required keys (name; system_prompt for LLM workers, processing_backend
      for processor workers)
    - Correct types for all known keys
    - Valid model tier values
    - Valid JSON Schema structure for input_schema/output_schema
    - Knowledge silos structural integrity
    - Numeric bounds (timeouts, token limits)
    """
    errors = _validate_base(config, _WORKER_REQUIRED, "worker", path)
    if not isinstance(config, dict):
        return errors

    pfx = f"worker config at {path}"

    # Determine worker kind — LLM workers need system_prompt, processors need backend.
    kind = config.get("worker_kind", "llm")
    if kind == "llm":
        if "system_prompt" not in config:
            errors.append(f"{pfx}: LLM worker missing required key 'system_prompt'")
        elif not isinstance(config["system_prompt"], str):
            errors.append(f"{pfx}: 'system_prompt' must be a string")
    elif kind == "processor":
        if "processing_backend" not in config:
            errors.append(f"{pfx}: processor worker missing 'processing_backend'")
        elif not isinstance(config["processing_backend"], str):
            errors.append(f"{pfx}: 'processing_backend' must be a string")
        else:
            errors.extend(_validate_processing_backend(config["processing_backend"], pfx))
    else:
        errors.append(f"{pfx}: 'worker_kind' must be 'llm' or 'processor', got '{kind}'")

    # Model tier
    tier = config.get("default_model_tier")
    if tier is not None and tier not in VALID_MODEL_TIERS:
        errors.append(
            f"{pfx}: 'default_model_tier' must be one of {sorted(VALID_MODEL_TIERS)}, got '{tier}'"
        )

    # Schema fields
    for schema_key in ("input_schema", "output_schema"):
        errors.extend(_validate_json_schema(config, schema_key, pfx))

    # Numeric bounds
    for key in ("timeout_seconds", "max_input_tokens", "max_output_tokens"):
        if key in config:
            val = config[key]
            if not isinstance(val, (int, float)) or isinstance(val, bool):
                errors.append(f"{pfx}: '{key}' must be a number, got {type(val).__name__}")
            elif val <= 0:
                errors.append(f"{pfx}: '{key}' must be positive, got {val}")

    # reset_after_task must be true (workers are stateless)
    rat = config.get("reset_after_task")
    if rat is not None and rat is not True:
        errors.append(f"{pfx}: 'reset_after_task' must be true (workers are stateless)")

    # Knowledge silos
    if "knowledge_silos" in config:
        errors.extend(_validate_knowledge_silos(config["knowledge_silos"], path))

    # Knowledge sources (legacy)
    if "knowledge_sources" in config:
        ks = config["knowledge_sources"]
        if not isinstance(ks, list):
            errors.append(f"{pfx}: 'knowledge_sources' must be a list")

    # File-ref resolution
    if "resolve_file_refs" in config:
        if not isinstance(config["resolve_file_refs"], list):
            errors.append(f"{pfx}: 'resolve_file_refs' must be a list of field names")
        if "workspace_dir" not in config:
            errors.append(f"{pfx}: 'resolve_file_refs' requires 'workspace_dir' to be set")

    return errors

validate_pipeline_config

validate_pipeline_config(config: dict[str, Any], path: str | Path = '<unknown>') -> list[str]

Validate a pipeline orchestrator config.

Checks for: - Required keys (name, pipeline_stages) - Each stage has name and worker_type - Stage names are unique - input_mapping values are valid dot-notation paths - depends_on references exist as stage names - condition syntax is valid (3-part: path op value) - Tier values are valid - No circular dependencies (basic check)

Source code in src/loom/core/config.py
def validate_pipeline_config(  # noqa: PLR0912, PLR0915
    config: dict[str, Any], path: str | Path = "<unknown>"
) -> list[str]:
    """Validate a pipeline orchestrator config.

    Checks for:
    - Required keys (name, pipeline_stages)
    - Each stage has name and worker_type
    - Stage names are unique
    - input_mapping values are valid dot-notation paths
    - depends_on references exist as stage names
    - condition syntax is valid (3-part: path op value)
    - Tier values are valid
    - No circular dependencies (basic check)
    """
    errors = _validate_base(config, _PIPELINE_REQUIRED, "pipeline", path)
    if not isinstance(config, dict):
        return errors

    pfx = f"pipeline config at {path}"
    stages = config.get("pipeline_stages", [])
    if not isinstance(stages, list):
        return errors  # Already caught by _validate_base

    # Timeout
    if "timeout_seconds" in config:
        _check_positive_number(config, "timeout_seconds", pfx, errors)

    # max_concurrent_goals
    mcg = config.get("max_concurrent_goals")
    if mcg is not None and (not isinstance(mcg, int) or isinstance(mcg, bool) or mcg < 1):
        errors.append(f"{pfx}: 'max_concurrent_goals' must be a positive integer")

    # Collect stage names for cross-reference validation.
    stage_names: set[str] = set()
    seen_names: list[str] = []

    for i, stage in enumerate(stages):
        sp = f"{pfx}: pipeline_stages[{i}]"
        if not isinstance(stage, dict):
            errors.append(f"{sp}: expected dict, got {type(stage).__name__}")
            continue

        # Required fields
        sname = stage.get("name")
        if sname is None:
            errors.append(f"{sp}: missing required key 'name'")
        elif not isinstance(sname, str):
            errors.append(f"{sp}: 'name' must be a string")
        else:
            if sname in stage_names:
                errors.append(f"{sp}: duplicate stage name '{sname}'")
            stage_names.add(sname)
            seen_names.append(sname)

        if "worker_type" not in stage:
            errors.append(f"{sp}: missing required key 'worker_type'")
        elif not isinstance(stage["worker_type"], str):
            errors.append(f"{sp}: 'worker_type' must be a string")

        # Tier validation
        tier = stage.get("tier")
        if tier is not None and tier not in VALID_MODEL_TIERS:
            errors.append(f"{sp}: 'tier' must be one of {sorted(VALID_MODEL_TIERS)}, got '{tier}'")

        # input_mapping validation
        mapping = stage.get("input_mapping")
        if mapping is not None:
            if not isinstance(mapping, dict):
                errors.append(f"{sp}: 'input_mapping' must be a dict")
            else:
                for target, source_path in mapping.items():
                    if not isinstance(source_path, str):
                        errors.append(f"{sp}: input_mapping['{target}'] must be a string path")
                    elif not source_path:
                        errors.append(f"{sp}: input_mapping['{target}'] must not be empty")
                    elif (
                        isinstance(source_path, str)
                        and source_path
                        and not (source_path.startswith("'") and source_path.endswith("'"))
                    ):
                        # Validate source path references goal.* or an existing stage
                        # (literal values wrapped in single quotes are skipped)
                        root = source_path.split(".")[0]
                        if root != "goal" and root not in stage_names:
                            errors.append(
                                f"{sp}: input_mapping['{target}'] references unknown "
                                f"source '{root}' (must be 'goal' or a preceding "
                                f"stage name)"
                            )

        # depends_on validation
        deps = stage.get("depends_on")
        if deps is not None:
            if not isinstance(deps, list):
                errors.append(f"{sp}: 'depends_on' must be a list")
            else:
                for dep in deps:
                    if not isinstance(dep, str):
                        errors.append(f"{sp}: depends_on entries must be strings")
                    elif dep not in stage_names and dep not in seen_names:
                        errors.append(f"{sp}: depends_on references unknown stage '{dep}'")

        # condition syntax check
        cond = stage.get("condition")
        if cond is not None:
            if not isinstance(cond, str):
                errors.append(f"{sp}: 'condition' must be a string")
            else:
                parts = cond.split()
                if len(parts) != 3:
                    errors.append(
                        f"{sp}: 'condition' must be 'path op value' "
                        f"(3 space-separated parts), got {len(parts)} parts"
                    )
                elif parts[1] not in ("==", "!="):
                    errors.append(
                        f"{sp}: condition operator must be '==' or '!=', got '{parts[1]}'"
                    )

        # Per-stage timeout
        if "timeout_seconds" in stage:
            _check_positive_number(stage, "timeout_seconds", sp, errors)

        # Per-stage I/O schemas (for inter-stage contract validation)
        for schema_key in ("input_schema", "output_schema"):
            if schema_key in stage:
                errors.extend(_validate_json_schema(stage, schema_key, sp))

    return errors

validate_orchestrator_config

validate_orchestrator_config(config: dict[str, Any], path: str | Path = '<unknown>') -> list[str]

Validate a dynamic orchestrator (OrchestratorActor) config.

Checks for: - Required keys (name) - system_prompt presence (warned, not error — could be in config) - checkpoint settings structure - Numeric bounds (timeouts, concurrency) - available_workers structure

Source code in src/loom/core/config.py
def validate_orchestrator_config(  # noqa: PLR0912
    config: dict[str, Any], path: str | Path = "<unknown>"
) -> list[str]:
    """Validate a dynamic orchestrator (OrchestratorActor) config.

    Checks for:
    - Required keys (name)
    - system_prompt presence (warned, not error — could be in config)
    - checkpoint settings structure
    - Numeric bounds (timeouts, concurrency)
    - available_workers structure
    """
    errors = _validate_base(config, _ORCHESTRATOR_REQUIRED, "orchestrator", path)
    if not isinstance(config, dict):
        return errors

    pfx = f"orchestrator config at {path}"

    # System prompt (required for LLM-driven orchestrator)
    if "system_prompt" not in config:
        errors.append(f"{pfx}: missing 'system_prompt' (required for LLM orchestrator)")
    elif not isinstance(config["system_prompt"], str):
        errors.append(f"{pfx}: 'system_prompt' must be a string")

    # Checkpoint settings
    cp = config.get("checkpoint")
    if cp is not None:
        if not isinstance(cp, dict):
            errors.append(f"{pfx}: 'checkpoint' must be a dict")
        else:
            tt = cp.get("token_threshold")
            if tt is not None and (not isinstance(tt, int) or isinstance(tt, bool) or tt < 1):
                errors.append(f"{pfx}: checkpoint.token_threshold must be a positive integer")
            rw = cp.get("recent_window")
            if rw is not None and (not isinstance(rw, int) or isinstance(rw, bool) or rw < 0):
                errors.append(f"{pfx}: checkpoint.recent_window must be a non-negative integer")

    # Concurrency and timeout
    for key in ("max_concurrent_goals", "max_concurrent_tasks"):
        val = config.get(key)
        if val is not None and (not isinstance(val, int) or isinstance(val, bool) or val < 1):
            errors.append(f"{pfx}: '{key}' must be a positive integer")

    if "timeout_seconds" in config:
        _check_positive_number(config, "timeout_seconds", pfx, errors)

    # available_workers list
    aw = config.get("available_workers")
    if aw is not None:
        if not isinstance(aw, list):
            errors.append(f"{pfx}: 'available_workers' must be a list")
        else:
            for i, w in enumerate(aw):
                wp = f"{pfx}: available_workers[{i}]"
                if not isinstance(w, dict):
                    errors.append(f"{wp}: expected dict")
                    continue
                if "name" not in w:
                    errors.append(f"{wp}: missing required key 'name'")
                if "description" not in w:
                    errors.append(f"{wp}: missing required key 'description'")

    return errors

validate_router_rules

validate_router_rules(config: dict[str, Any], path: str | Path = '<unknown>') -> list[str]

Validate router_rules.yaml.

Checks for: - Top-level must be a dict - tier_overrides values are valid ModelTier values - rate_limits have valid structure (max_concurrent > 0) - rate_limits keys are valid tier names

Source code in src/loom/core/config.py
def validate_router_rules(  # noqa: PLR0912
    config: dict[str, Any], path: str | Path = "<unknown>"
) -> list[str]:
    """Validate router_rules.yaml.

    Checks for:
    - Top-level must be a dict
    - tier_overrides values are valid ModelTier values
    - rate_limits have valid structure (max_concurrent > 0)
    - rate_limits keys are valid tier names
    """
    errors: list[str] = []
    if not isinstance(config, dict):
        return [f"router rules at {path}: expected dict, got {type(config).__name__}"]

    pfx = f"router rules at {path}"

    # tier_overrides
    overrides = config.get("tier_overrides")
    if overrides is not None:
        if not isinstance(overrides, dict):
            errors.append(f"{pfx}: 'tier_overrides' must be a dict")
        else:
            for wtype, tier in overrides.items():
                if not isinstance(tier, str):
                    errors.append(f"{pfx}: tier_overrides['{wtype}'] must be a string")
                elif tier not in VALID_MODEL_TIERS:
                    errors.append(
                        f"{pfx}: tier_overrides['{wtype}'] = '{tier}' is not a "
                        f"valid tier (must be one of {sorted(VALID_MODEL_TIERS)})"
                    )

    # rate_limits
    limits = config.get("rate_limits")
    if limits is not None:
        if not isinstance(limits, dict):
            errors.append(f"{pfx}: 'rate_limits' must be a dict")
        else:
            for tier_name, limit_cfg in limits.items():
                lp = f"{pfx}: rate_limits['{tier_name}']"
                if tier_name not in VALID_MODEL_TIERS:
                    errors.append(
                        f"{lp}: unknown tier (must be one of {sorted(VALID_MODEL_TIERS)})"
                    )
                if not isinstance(limit_cfg, dict):
                    errors.append(f"{lp}: must be a dict")
                    continue
                mc = limit_cfg.get("max_concurrent")
                if mc is not None and (not isinstance(mc, int) or isinstance(mc, bool) or mc < 1):
                    errors.append(f"{lp}: 'max_concurrent' must be a positive integer")
                tpm = limit_cfg.get("tokens_per_minute")
                if tpm is not None and (
                    not isinstance(tpm, (int, float)) or isinstance(tpm, bool) or tpm <= 0
                ):
                    errors.append(f"{lp}: 'tokens_per_minute' must be a positive number")

    return errors

Contracts

Input/output contract validation. Ensures messages match their declared JSON Schema, with correct bool/int distinction.

contracts

Lightweight JSON Schema validation for I/O contracts.

We avoid the jsonschema dependency — this covers the 90% case (required fields + shallow type checks for object properties). This is intentionally simple: every worker has an input_schema and output_schema defined in its YAML config, and this module validates payloads against those schemas at message boundaries.

Limitations (by design — keeps the dependency tree minimal): - No nested object validation (only top-level properties) - No array item type validation - No min/max, pattern, enum, or format constraints - No $ref or schema composition (allOf, oneOf, anyOf)

If you need full Draft 2020-12 validation, add jsonschema to dependencies and swap this module. The validate_input/validate_output API stays the same.

validate_input

validate_input(data: dict[str, Any], schema: dict) -> list[str]

Validate a task's input payload against the worker's input_schema.

Returns an empty list if valid, or a list of human-readable error strings.

Source code in src/loom/core/contracts.py
def validate_input(data: dict[str, Any], schema: dict) -> list[str]:
    """Validate a task's input payload against the worker's input_schema.

    Returns an empty list if valid, or a list of human-readable error strings.
    """
    return _validate(data, schema, "input")

validate_output

validate_output(data: dict[str, Any], schema: dict) -> list[str]

Validate a worker's output against its output_schema.

Returns an empty list if valid, or a list of human-readable error strings.

Source code in src/loom/core/contracts.py
def validate_output(data: dict[str, Any], schema: dict) -> list[str]:
    """Validate a worker's output against its output_schema.

    Returns an empty list if valid, or a list of human-readable error strings.
    """
    return _validate(data, schema, "output")

Manifest

App bundle manifest model (AppManifest). Used by the Workshop's app deployment system for ZIP bundle validation.

manifest

App manifest schema for Loom application bundles.

A Loom app is a ZIP archive containing worker/pipeline/scheduler configs, optional scripts, and a manifest.yaml describing the app metadata and entry points. Apps are deployed via the Workshop UI or CLI.

Manifest format::

name: "myapp"
version: "1.0.0"
description: "My Loom application"
loom_version: ">=0.4.0"
required_extras: [duckdb, mcp]
python_package:          # optional — for apps with Python code
  name: "myapp"
  install_path: "src/"
entry_configs:
  workers:
    - config: "configs/workers/my_worker.yaml"
      tier: "standard"
  pipelines:
    - config: "configs/orchestrators/my_pipeline.yaml"
  schedulers:
    - config: "configs/schedulers/my_schedule.yaml"
  mcp:
    - config: "configs/mcp/my_mcp.yaml"
scripts:
  - path: "scripts/setup.py"
    description: "Initial setup script"

PythonPackage

Bases: BaseModel

Optional Python package included in the app bundle.

EntryConfigRef

Bases: BaseModel

Reference to a config file within the app bundle.

EntryConfigs

Bases: BaseModel

Entry point configurations that the app exposes.

ScriptRef

Bases: BaseModel

Reference to a script included in the app bundle.

AppManifest

Bases: BaseModel

Loom application manifest.

Describes the contents and metadata of a Loom app bundle (ZIP archive). The manifest is stored as manifest.yaml at the root of the ZIP.

validate_name classmethod

validate_name(v: str) -> str

App name must be a valid identifier (lowercase, hyphens, underscores).

Source code in src/loom/core/manifest.py
@field_validator("name")
@classmethod
def validate_name(cls, v: str) -> str:
    """App name must be a valid identifier (lowercase, hyphens, underscores)."""
    import re

    if not re.match(r"^[a-z][a-z0-9_-]*$", v):
        msg = (
            f"App name '{v}' must start with a lowercase letter and contain "
            "only lowercase letters, digits, hyphens, and underscores"
        )
        raise ValueError(msg)
    return v

validate_version classmethod

validate_version(v: str) -> str

Version must be semantic (major.minor.patch, optional pre-release).

Source code in src/loom/core/manifest.py
@field_validator("version")
@classmethod
def validate_version(cls, v: str) -> str:
    """Version must be semantic (major.minor.patch, optional pre-release)."""
    import re

    if not re.match(r"^\d+\.\d+\.\d+", v):
        msg = f"Version '{v}' must follow semantic versioning (e.g., '1.0.0')"
        raise ValueError(msg)
    return v

validate_app_manifest

validate_app_manifest(data: dict[str, Any]) -> list[str]

Validate a manifest dict.

Returns a list of error strings (empty = valid).

Source code in src/loom/core/manifest.py
def validate_app_manifest(data: dict[str, Any]) -> list[str]:
    """Validate a manifest dict.

    Returns a list of error strings (empty = valid).
    """
    errors: list[str] = [
        f"Missing required field: {f}" for f in ("name", "version", "description") if f not in data
    ]

    if errors:
        return errors

    # Try parsing with Pydantic
    try:
        manifest = AppManifest(**data)
    except Exception as e:
        errors.append(f"Manifest validation error: {e}")
        return errors

    # Warn about unknown extras (non-fatal)
    for extra in manifest.required_extras:
        if extra not in KNOWN_EXTRAS:
            logger.warning("manifest.unknown_extra", extra=extra, app=manifest.name)

    # Verify config file extensions
    _check_yaml_extension(manifest.entry_configs.workers, "Worker", errors)
    _check_yaml_extension(manifest.entry_configs.pipelines, "Pipeline", errors)
    _check_yaml_extension(manifest.entry_configs.schedulers, "Scheduler", errors)
    _check_yaml_extension(manifest.entry_configs.mcp, "MCP", errors)

    return errors

load_manifest

load_manifest(path: Path) -> AppManifest

Load and validate a manifest from a YAML file.

Raises:

Type Description
FileNotFoundError

If the manifest file doesn't exist.

ValueError

If the manifest is invalid.

Source code in src/loom/core/manifest.py
def load_manifest(path: Path) -> AppManifest:
    """Load and validate a manifest from a YAML file.

    Raises:
        FileNotFoundError: If the manifest file doesn't exist.
        ValueError: If the manifest is invalid.
    """
    if not path.exists():
        raise FileNotFoundError(f"Manifest not found: {path}")

    with open(path) as f:
        data = yaml.safe_load(f)

    if not isinstance(data, dict):
        msg = f"Manifest must be a YAML mapping, got {type(data).__name__}"
        raise ValueError(msg)

    errors = validate_app_manifest(data)
    if errors:
        msg = f"Invalid manifest: {'; '.join(errors)}"
        raise ValueError(msg)

    return AppManifest(**data)

Workspace

File-ref resolution with path traversal protection. Maps file_ref fields in task payloads to actual file contents.

workspace

Workspace manager for file-ref resolution and safe file I/O.

Many Loom pipelines pass large data between stages via file references rather than inlining content in NATS messages. This module provides a centralized utility for resolving those references safely:

- Path traversal protection (prevents ``../../../etc/passwd`` escapes)
- File existence validation
- JSON read/write helpers with structured error handling

Usage from a ProcessingBackend::

from loom.core.workspace import WorkspaceManager

ws = WorkspaceManager("/tmp/my-workspace")
path = ws.resolve("report.pdf")           # validated absolute path
data = ws.read_json("report_extracted.json")  # parsed dict
ws.write_json("output.json", {"key": "value"})

Usage from worker config YAML (for LLMWorker file-ref resolution)::

workspace_dir: "/tmp/my-workspace"
resolve_file_refs: ["file_ref"]   # payload fields to resolve
See Also

loom.worker.runner.LLMWorker — resolves file_refs before building prompt loom.worker.processor.SyncProcessingBackend — base class for sync backends

WorkspaceManager

WorkspaceManager(workspace_dir: str | Path)

Manages file operations within a bounded workspace directory.

All file access is restricted to the workspace boundary. Any attempt to reference a file outside the workspace (via ../ or symlinks) raises ValueError.

Attributes:

Name Type Description
workspace_dir

Absolute path to the workspace root directory.

Source code in src/loom/core/workspace.py
def __init__(self, workspace_dir: str | Path) -> None:
    self.workspace_dir = Path(workspace_dir)

resolve

resolve(file_ref: str) -> Path

Resolve a file reference to a validated absolute path.

Parameters:

Name Type Description Default
file_ref str

Relative filename within the workspace (e.g., "report.pdf" or "subdir/data.json").

required

Returns:

Type Description
Path

Absolute resolved path guaranteed to be within the workspace.

Raises:

Type Description
ValueError

If the resolved path escapes the workspace boundary (path traversal attack).

FileNotFoundError

If the resolved file does not exist.

Source code in src/loom/core/workspace.py
def resolve(self, file_ref: str) -> Path:
    """Resolve a file reference to a validated absolute path.

    Args:
        file_ref: Relative filename within the workspace
            (e.g., ``"report.pdf"`` or ``"subdir/data.json"``).

    Returns:
        Absolute resolved path guaranteed to be within the workspace.

    Raises:
        ValueError: If the resolved path escapes the workspace
            boundary (path traversal attack).
        FileNotFoundError: If the resolved file does not exist.
    """
    resolved = (self.workspace_dir / file_ref).resolve()
    if not str(resolved).startswith(str(self.workspace_dir.resolve())):
        raise ValueError(f"Path traversal detected: {file_ref}")
    if not resolved.exists():
        raise FileNotFoundError(f"File not found in workspace: {file_ref}")
    return resolved

read_json

read_json(file_ref: str) -> dict[str, Any]

Read and parse a JSON file from the workspace.

Parameters:

Name Type Description Default
file_ref str

Relative filename of a JSON file in the workspace.

required

Returns:

Type Description
dict[str, Any]

Parsed JSON content as a dict.

Raises:

Type Description
ValueError

If path traversal is detected.

FileNotFoundError

If the file does not exist.

JSONDecodeError

If the file is not valid JSON.

Source code in src/loom/core/workspace.py
def read_json(self, file_ref: str) -> dict[str, Any]:
    """Read and parse a JSON file from the workspace.

    Args:
        file_ref: Relative filename of a JSON file in the workspace.

    Returns:
        Parsed JSON content as a dict.

    Raises:
        ValueError: If path traversal is detected.
        FileNotFoundError: If the file does not exist.
        json.JSONDecodeError: If the file is not valid JSON.
    """
    path = self.resolve(file_ref)
    return json.loads(path.read_text())

read_text

read_text(file_ref: str) -> str

Read text content from a workspace file.

Parameters:

Name Type Description Default
file_ref str

Relative filename in the workspace.

required

Returns:

Type Description
str

File contents as a string.

Raises:

Type Description
ValueError

If path traversal is detected.

FileNotFoundError

If the file does not exist.

Source code in src/loom/core/workspace.py
def read_text(self, file_ref: str) -> str:
    """Read text content from a workspace file.

    Args:
        file_ref: Relative filename in the workspace.

    Returns:
        File contents as a string.

    Raises:
        ValueError: If path traversal is detected.
        FileNotFoundError: If the file does not exist.
    """
    path = self.resolve(file_ref)
    return path.read_text()

write_json

write_json(filename: str, data: dict[str, Any]) -> Path

Write a dict as JSON to the workspace.

Parameters:

Name Type Description Default
filename str

Target filename within the workspace.

required
data dict[str, Any]

Dict to serialize as JSON.

required

Returns:

Type Description
Path

Absolute path to the written file.

Raises:

Type Description
OSError

If the write fails (disk full, permissions, etc.).

Source code in src/loom/core/workspace.py
def write_json(self, filename: str, data: dict[str, Any]) -> Path:
    """Write a dict as JSON to the workspace.

    Args:
        filename: Target filename within the workspace.
        data: Dict to serialize as JSON.

    Returns:
        Absolute path to the written file.

    Raises:
        OSError: If the write fails (disk full, permissions, etc.).
    """
    path = self.workspace_dir / filename
    path.write_text(json.dumps(data, indent=2))
    return path