Skip to content

Worker

The loom.worker package implements the two types of Loom workers:

  • LLM Workers (runner.py) — call language models with system prompts, tool-use loops, and JSON parsing. Used for summarization, classification, extraction, and analysis tasks.
  • Processor Workers (processor.py) — run arbitrary Python code (no LLM). Used for data transformation, ingestion, and integration tasks.

Both types are stateless: they process one task, return a result, and reset.

See Building Workflows for the user-facing guide.

Base

Abstract base class for all workers (TaskWorker).

base

TaskWorker base class.

Extracts the reusable worker lifecycle from the LLM-specific worker: message parsing, I/O contract validation, result publishing, timing, and error handling. Subclasses implement process() to do the actual work.

TaskWorker

TaskWorker(actor_id: str, config_path: str, nats_url: str = 'nats://nats:4222')

Bases: BaseActor

Generic stateless worker base.

Lifecycle per message: 1. Receive TaskMessage 2. Validate input against worker contract 3. Delegate to process() (subclass implements) 4. Validate output against worker contract 5. Publish TaskResult 6. Reset (no state retained)

Source code in src/loom/worker/base.py
def __init__(
    self,
    actor_id: str,
    config_path: str,
    nats_url: str = "nats://nats:4222",
) -> None:
    super().__init__(actor_id, nats_url)
    self._config_path = config_path
    self.config = self._load_config(config_path)

on_reload async

on_reload() -> None

Re-read the worker config from disk on reload signal.

Source code in src/loom/worker/base.py
async def on_reload(self) -> None:
    """Re-read the worker config from disk on reload signal."""
    self.config = self._load_config(self._config_path)
    logger.info("worker.config_reloaded", config_path=self._config_path)

handle_message async

handle_message(data: dict[str, Any]) -> None

Handle an incoming task message through the full worker lifecycle.

Source code in src/loom/worker/base.py
async def handle_message(self, data: dict[str, Any]) -> None:
    """Handle an incoming task message through the full worker lifecycle."""
    task = TaskMessage(**data)
    start = time.monotonic()

    log = logger.bind(
        task_id=task.task_id,
        worker_type=task.worker_type,
        model_tier=task.model_tier.value,
    )

    try:
        # 1. Validate input
        errors = validate_input(task.payload, self.config.get("input_schema", {}))
        if errors:
            await self._publish_result(
                task, TaskStatus.FAILED, error=f"Input validation: {errors}"
            )
            return

        # 2. Delegate to subclass — inject model_tier into metadata
        #    so process() can resolve the correct LLM backend.
        enriched_metadata = {**task.metadata, "model_tier": task.model_tier.value}
        result = await self.process(task.payload, enriched_metadata)

        # 3. Validate output
        output = result["output"]
        output_errors = validate_output(output, self.config.get("output_schema", {}))
        if output_errors:
            await self._publish_result(
                task,
                TaskStatus.FAILED,
                error=f"Output validation: {output_errors}",
                model_used=result.get("model_used"),
                tokens=result.get("token_usage"),
            )
            return

        # 4. Publish success
        elapsed = int((time.monotonic() - start) * 1000)
        await self._publish_result(
            task,
            TaskStatus.COMPLETED,
            output=output,
            model_used=result.get("model_used"),
            tokens=result.get("token_usage"),
            elapsed=elapsed,
        )
        log.info("worker.completed", ms=elapsed)

    except Exception as e:
        log.error("worker.exception", error=str(e))
        await self._publish_result(task, TaskStatus.FAILED, error=str(e))

    # Reset — worker holds NO state from this task.
    # This is a design invariant, not a suggestion. Any instance variables
    # set during process() must not affect subsequent invocations.
    await self.reset()

process abstractmethod async

process(payload: dict[str, Any], metadata: dict[str, Any]) -> dict[str, Any]

Process a task payload. Subclasses implement this.

Parameters:

Name Type Description Default
payload dict[str, Any]

Validated input dict (matches input_schema).

required
metadata dict[str, Any]

Task metadata dict (routing hints, pipeline context, etc.).

required

Returns:

Type Description
dict[str, Any]

A dict with the following structure::

{ "output": dict, # Must match output_schema "model_used": str | None, # Identifier for what processed this "token_usage": dict | None, # {"prompt_tokens": int, ...} or empty }

Source code in src/loom/worker/base.py
@abstractmethod
async def process(self, payload: dict[str, Any], metadata: dict[str, Any]) -> dict[str, Any]:
    """Process a task payload. Subclasses implement this.

    Args:
        payload: Validated input dict (matches input_schema).
        metadata: Task metadata dict (routing hints, pipeline context, etc.).

    Returns:
        A dict with the following structure::

            {
                "output": dict,              # Must match output_schema
                "model_used": str | None,    # Identifier for what processed this
                "token_usage": dict | None,  # {"prompt_tokens": int, ...} or empty
            }
    """
    ...

reset async

reset() -> None

Post-task cleanup hook for subclasses.

Called after every task (success or failure) to release temporary resources (file handles, caches, scratch buffers). The default implementation is a no-op. Override in subclasses that allocate per-task resources in process().

This is the enforcement point for the statelessness invariant: any state set during process() must be cleared here.

Source code in src/loom/worker/base.py
async def reset(self) -> None:
    """Post-task cleanup hook for subclasses.

    Called after every task (success or failure) to release temporary
    resources (file handles, caches, scratch buffers). The default
    implementation is a no-op. Override in subclasses that allocate
    per-task resources in process().

    This is the enforcement point for the statelessness invariant:
    any state set during process() must be cleared here.
    """

Runner

LLMWorker — the main LLM worker actor. Includes execute_with_tools(), the standalone tool-use loop shared with the Workshop test bench.

runner

LLM worker actor.

Processes a single task via an LLM backend and resets. No state carries between tasks — this is enforced, not optional.

Supports tool-use: when knowledge_silos include tool-type entries, the worker offers those tools to the LLM and executes a multi-turn loop until the LLM produces a final text answer.

LLMWorker

LLMWorker(actor_id: str, config_path: str, backends: dict[str, LLMBackend], nats_url: str = 'nats://nats:4222')

Bases: TaskWorker

LLM-backed stateless worker.

Extends TaskWorker with LLM-specific logic: - Builds prompt from system_prompt + JSON payload - Loads knowledge silos (folder content → system prompt, tools → function-calling) - Calls the appropriate LLM backend by model tier - Executes multi-turn tool-use loop when tools are available - Parses JSON output from the LLM response (with fence-stripping fallback) - Applies silo_updates for writable folder silos

Source code in src/loom/worker/runner.py
def __init__(
    self,
    actor_id: str,
    config_path: str,
    backends: dict[str, LLMBackend],
    nats_url: str = "nats://nats:4222",
) -> None:
    super().__init__(actor_id, config_path, nats_url)
    self.backends = backends

process async

process(payload: dict[str, Any], metadata: dict[str, Any]) -> dict[str, Any]

Build prompt, call LLM with tool-use loop, and parse structured output.

Source code in src/loom/worker/runner.py
async def process(self, payload: dict[str, Any], metadata: dict[str, Any]) -> dict[str, Any]:
    """Build prompt, call LLM with tool-use loop, and parse structured output."""
    # 1. Build prompt
    system_prompt = self.config["system_prompt"]

    # 1a. Knowledge silo injection — load folder silos into system prompt
    silos = self.config.get("knowledge_silos", [])
    if silos:
        from loom.worker.knowledge import load_knowledge_silos

        silo_text = load_knowledge_silos(silos)
        if silo_text:
            system_prompt = silo_text + "\n\n" + system_prompt

    # 1b. Legacy knowledge injection — prepend loaded knowledge to system prompt
    knowledge_sources = self.config.get("knowledge_sources", [])
    if knowledge_sources:
        from loom.worker.knowledge import load_knowledge_sources

        knowledge_text = load_knowledge_sources(knowledge_sources)
        if knowledge_text:
            system_prompt = knowledge_text + "\n\n" + system_prompt

    # 1c. File-ref resolution — read workspace files and inject content
    workspace_dir = self.config.get("workspace_dir")
    file_ref_fields = self.config.get("resolve_file_refs", [])
    if workspace_dir and file_ref_fields:
        from loom.core.workspace import WorkspaceManager

        ws = WorkspaceManager(workspace_dir)
        for field in file_ref_fields:
            if field in payload:
                try:
                    content = ws.read_json(payload[field])
                    payload[f"{field}_content"] = content
                except (ValueError, FileNotFoundError, json.JSONDecodeError) as e:
                    logger.warning(
                        "worker.file_ref_resolution_failed",
                        field=field,
                        error=str(e),
                    )

    user_message = json.dumps(payload, indent=2)

    # 2. Load tool providers from knowledge_silos
    tool_providers = _load_tool_providers(silos)
    tool_defs = [p.get_definition() for p in tool_providers.values()] or None

    # 3. Resolve backend from task metadata or config default
    tier = metadata.get(
        "model_tier",
        self.config.get("default_model_tier", self.config.get("default_tier", "standard")),
    )
    backend = self.backends.get(tier)
    if not backend:
        raise RuntimeError(f"No backend for tier: {tier}")

    # 4. Call LLM with tool-use loop
    logger.info("worker.calling_llm", tier=tier, tools=len(tool_providers))
    max_tokens = self.config.get("max_output_tokens", 2000)
    result = await execute_with_tools(
        backend=backend,
        system_prompt=system_prompt,
        user_message=user_message,
        tool_providers=tool_providers,
        tool_defs=tool_defs,
        max_tokens=max_tokens,
    )
    total_prompt_tokens = result["prompt_tokens"]
    total_completion_tokens = result["completion_tokens"]

    # 4b. Log token usage for cost tracking.
    logger.info(
        "worker.llm_usage",
        worker_type=self.config.get("name", "unknown"),
        model_used=result.get("model", "unknown"),
        input_tokens=total_prompt_tokens,
        output_tokens=total_completion_tokens,
    )

    # 5. Parse JSON output — handles markdown fences and preamble text
    if result.get("content") is None:
        raise ValueError("LLM did not produce a text response after tool-use loop")

    output = _extract_json(result["content"])

    # 7. Process silo_updates — apply write-back to writable folder silos
    silo_updates = output.pop("silo_updates", None)
    if silo_updates:
        from loom.worker.knowledge import apply_silo_updates

        apply_silo_updates(silo_updates, silos)

    return {
        "output": output,
        "model_used": result["model"],
        "token_usage": {
            "prompt_tokens": total_prompt_tokens,
            "completion_tokens": total_completion_tokens,
        },
    }

execute_with_tools async

execute_with_tools(backend: LLMBackend, system_prompt: str, user_message: str, tool_providers: dict[str, ToolProvider], tool_defs: list[dict[str, Any]] | None, max_tokens: int = 2000) -> dict[str, Any]

Execute an LLM call with multi-round tool-use loop.

This function encapsulates the core LLM interaction pattern used by both LLMWorker.process() and WorkerTestRunner.run(). It handles:

  • Initial LLM call with optional tool definitions
  • Multi-turn tool execution loop (up to MAX_TOOL_ROUNDS)
  • Token count aggregation across rounds
  • Error handling for unknown tools and tool execution failures

Parameters:

Name Type Description Default
backend LLMBackend

LLM backend to call (Anthropic, Ollama, OpenAI-compatible).

required
system_prompt str

Full system prompt (with knowledge silo content prepended).

required
user_message str

User message (typically JSON payload).

required
tool_providers dict[str, ToolProvider]

Map of tool name → ToolProvider for execution.

required
tool_defs list[dict[str, Any]] | None

Tool definitions list for the LLM (or None for no tools).

required
max_tokens int

Maximum output tokens per LLM call.

2000

Returns:

Type Description
dict[str, Any]

Dict with keys: content, model, prompt_tokens, completion_tokens,

dict[str, Any]

tool_calls, stop_reason. Token counts are aggregated across all

dict[str, Any]

tool-use rounds.

Source code in src/loom/worker/runner.py
async def execute_with_tools(  # noqa: PLR0915
    backend: LLMBackend,
    system_prompt: str,
    user_message: str,
    tool_providers: dict[str, ToolProvider],
    tool_defs: list[dict[str, Any]] | None,
    max_tokens: int = 2000,
) -> dict[str, Any]:
    """Execute an LLM call with multi-round tool-use loop.

    This function encapsulates the core LLM interaction pattern used by both
    ``LLMWorker.process()`` and ``WorkerTestRunner.run()``.  It handles:

    - Initial LLM call with optional tool definitions
    - Multi-turn tool execution loop (up to ``MAX_TOOL_ROUNDS``)
    - Token count aggregation across rounds
    - Error handling for unknown tools and tool execution failures

    Args:
        backend: LLM backend to call (Anthropic, Ollama, OpenAI-compatible).
        system_prompt: Full system prompt (with knowledge silo content prepended).
        user_message: User message (typically JSON payload).
        tool_providers: Map of tool name → ToolProvider for execution.
        tool_defs: Tool definitions list for the LLM (or None for no tools).
        max_tokens: Maximum output tokens per LLM call.

    Returns:
        Dict with keys: content, model, prompt_tokens, completion_tokens,
        tool_calls, stop_reason.  Token counts are aggregated across all
        tool-use rounds.
    """
    with _tracer.start_as_current_span(
        "llm.call",
        attributes={"llm.max_tokens": max_tokens, "llm.has_tools": tool_defs is not None},
    ) as llm_span:
        result = await backend.complete(
            system_prompt=system_prompt,
            user_message=user_message,
            max_tokens=max_tokens,
            tools=tool_defs,
        )
        # Legacy attributes (backward compat)
        llm_span.set_attribute("llm.model", result.get("model", "unknown"))
        llm_span.set_attribute("llm.prompt_tokens", result.get("prompt_tokens", 0))
        llm_span.set_attribute("llm.completion_tokens", result.get("completion_tokens", 0))

        # OTel GenAI semantic conventions
        # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/
        llm_span.set_attribute("gen_ai.system", result.get("gen_ai_system", "unknown"))
        llm_span.set_attribute("gen_ai.request.model", result.get("gen_ai_request_model", ""))
        llm_span.set_attribute("gen_ai.response.model", result.get("gen_ai_response_model", ""))
        llm_span.set_attribute("gen_ai.usage.input_tokens", result.get("prompt_tokens", 0))
        llm_span.set_attribute("gen_ai.usage.output_tokens", result.get("completion_tokens", 0))
        if result.get("gen_ai_request_temperature") is not None:
            llm_span.set_attribute(
                "gen_ai.request.temperature", result["gen_ai_request_temperature"]
            )
        if result.get("gen_ai_request_max_tokens") is not None:
            llm_span.set_attribute("gen_ai.request.max_tokens", result["gen_ai_request_max_tokens"])

        # Optional content logging (opt-in via env var — may contain PII)
        if os.environ.get("LOOM_TRACE_CONTENT", "").lower() in ("1", "true"):
            llm_span.add_event(
                "gen_ai.content.prompt",
                {"gen_ai.prompt": user_message},
            )
            if result.get("content"):
                llm_span.add_event(
                    "gen_ai.content.completion",
                    {"gen_ai.completion": result["content"]},
                )

    total_prompt_tokens = result.get("prompt_tokens", 0)
    total_completion_tokens = result.get("completion_tokens", 0)
    messages: list[dict[str, Any]] | None = None
    rounds = 0

    while result.get("tool_calls") and rounds < MAX_TOOL_ROUNDS:
        rounds += 1
        logger.info("worker.tool_round", round=rounds, calls=len(result["tool_calls"]))

        # Build message history on first tool round
        if messages is None:
            messages = [{"role": "user", "content": user_message}]

        # Append assistant message with tool calls
        assistant_msg: dict[str, Any] = {"role": "assistant", "tool_calls": result["tool_calls"]}
        if result.get("content"):
            assistant_msg["content"] = result["content"]
        messages.append(assistant_msg)

        # Execute each tool call
        for call in result["tool_calls"]:
            tool_name = call["name"]
            provider = tool_providers.get(tool_name)
            if provider is None:
                tool_result = json.dumps({"error": f"Unknown tool: {tool_name}"})
                logger.warning("worker.unknown_tool", tool=tool_name)
            else:
                try:
                    tool_result = await provider.execute(call["arguments"])
                except Exception as e:
                    tool_result = json.dumps({"error": str(e)})
                    logger.error("worker.tool_execution_failed", tool=tool_name, error=str(e))

            messages.append(
                {
                    "role": "tool",
                    "tool_call_id": call["id"],
                    "content": tool_result,
                }
            )

        # Call LLM again with updated message history
        with _tracer.start_as_current_span(
            "llm.tool_continuation",
            attributes={"llm.tool_round": rounds},
        ) as cont_span:
            result = await backend.complete(
                system_prompt=system_prompt,
                user_message=user_message,
                messages=messages,
                max_tokens=max_tokens,
                tools=tool_defs,
            )
            cont_span.set_attribute("llm.prompt_tokens", result.get("prompt_tokens", 0))
            cont_span.set_attribute("llm.completion_tokens", result.get("completion_tokens", 0))
        total_prompt_tokens += result.get("prompt_tokens", 0)
        total_completion_tokens += result.get("completion_tokens", 0)

    if rounds >= MAX_TOOL_ROUNDS:
        logger.warning("worker.max_tool_rounds_reached", rounds=rounds)

    # Return result with aggregated token counts
    result["prompt_tokens"] = total_prompt_tokens
    result["completion_tokens"] = total_completion_tokens
    return result

Backends

LLM backend implementations: AnthropicBackend, OllamaBackend, OpenAICompatibleBackend. Plus build_backends_from_env() for automatic backend detection from environment variables and ~/.loom/config.yaml.

backends

LLM backend adapters — uniform interface for local and API models.

Each backend wraps a specific LLM provider's API and normalizes the response into a consistent dict format. Workers never call APIs directly; they always go through a backend.

To add a new backend
  1. Subclass LLMBackend
  2. Implement complete() returning the standard response dict
  3. Register it in cli/main.py's worker command (backend resolution by tier)

All backends use httpx with a 120s timeout. Adjust if your models are slow.

Tool-use support

Backends accept optional tools and messages parameters. When tools is provided, the LLM may return tool_calls instead of content. When messages is provided, it replaces the single user_message for multi-turn conversations (tool execution loop).

LLMBackend

Bases: ABC

Common interface all model backends implement.

complete abstractmethod async

complete(system_prompt: str, user_message: str, max_tokens: int = 2000, temperature: float = 0.0, *, tools: list[dict[str, Any]] | None = None, messages: list[dict[str, Any]] | None = None) -> dict[str, Any]

Complete an LLM request and return a normalized response dict.

Parameters:

Name Type Description Default
system_prompt str

System instructions for the LLM.

required
user_message str

User message (ignored when messages is provided).

required
max_tokens int

Maximum tokens in the response.

2000
temperature float

Sampling temperature.

0.0
tools list[dict[str, Any]] | None

Optional list of tool definitions for function-calling.

None
messages list[dict[str, Any]] | None

Optional full message history for multi-turn. When provided, overrides user_message.

None

Returns:

Type Description
dict[str, Any]

A dict with the following structure::

{ "content": str | None, # Text response (None if tool_calls) "model": str, # Model identifier "prompt_tokens": int, "completion_tokens": int, "tool_calls": list | None, # [{"id": str, "name": str, "arguments": dict}] "stop_reason": str | None, # "end_turn" | "tool_use" }

Source code in src/loom/worker/backends.py
@abstractmethod
async def complete(
    self,
    system_prompt: str,
    user_message: str,
    max_tokens: int = 2000,
    temperature: float = 0.0,
    *,
    tools: list[dict[str, Any]] | None = None,
    messages: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
    """Complete an LLM request and return a normalized response dict.

    Args:
        system_prompt: System instructions for the LLM.
        user_message: User message (ignored when ``messages`` is provided).
        max_tokens: Maximum tokens in the response.
        temperature: Sampling temperature.
        tools: Optional list of tool definitions for function-calling.
        messages: Optional full message history for multi-turn. When
            provided, overrides ``user_message``.

    Returns:
        A dict with the following structure::

            {
                "content": str | None,      # Text response (None if tool_calls)
                "model": str,               # Model identifier
                "prompt_tokens": int,
                "completion_tokens": int,
                "tool_calls": list | None,  # [{"id": str, "name": str, "arguments": dict}]
                "stop_reason": str | None,  # "end_turn" | "tool_use"
            }
    """
    ...

AnthropicBackend

AnthropicBackend(api_key: str, model: str = 'claude-sonnet-4-20250514')

Bases: LLMBackend

Claude API via httpx (Messages API).

Uses the Anthropic Messages API directly via httpx rather than the anthropic Python SDK — this keeps dependencies minimal and avoids version coupling.

Source code in src/loom/worker/backends.py
def __init__(self, api_key: str, model: str = "claude-sonnet-4-20250514") -> None:
    self.api_key = api_key
    self.model = model
    self.client = httpx.AsyncClient(
        base_url="https://api.anthropic.com",
        headers={
            "x-api-key": api_key,
            # Anthropic API version — pinned for reproducibility.
            # See: https://docs.anthropic.com/en/api/versioning
            "anthropic-version": "2024-10-22",
            "content-type": "application/json",
        },
        timeout=120.0,
    )

complete async

complete(system_prompt: str, user_message: str, max_tokens: int = 2000, temperature: float = 0.0, *, tools: list[dict[str, Any]] | None = None, messages: list[dict[str, Any]] | None = None) -> dict[str, Any]

Complete an LLM request via the Anthropic Messages API.

Source code in src/loom/worker/backends.py
async def complete(
    self,
    system_prompt: str,
    user_message: str,
    max_tokens: int = 2000,
    temperature: float = 0.0,
    *,
    tools: list[dict[str, Any]] | None = None,
    messages: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
    """Complete an LLM request via the Anthropic Messages API."""
    # Build messages array
    if messages is not None:
        api_messages = _anthropic_messages(messages)
    else:
        api_messages = [{"role": "user", "content": user_message}]

    body: dict[str, Any] = {
        "model": self.model,
        "max_tokens": max_tokens,
        "temperature": temperature,
        "system": system_prompt,
        "messages": api_messages,
    }

    # Add tool definitions if provided
    if tools:
        body["tools"] = [
            {
                "name": t["name"],
                "description": t.get("description", ""),
                "input_schema": t.get("parameters", {"type": "object"}),
            }
            for t in tools
        ]

    resp = await self.client.post("/v1/messages", json=body)
    resp.raise_for_status()
    data = resp.json()

    # Parse response — may contain text blocks, tool_use blocks, or both
    content = None
    tool_calls = None

    for block in data.get("content", []):
        if block["type"] == "text":
            content = block["text"]
        elif block["type"] == "tool_use":
            if tool_calls is None:
                tool_calls = []
            tool_calls.append(
                {
                    "id": block["id"],
                    "name": block["name"],
                    "arguments": block["input"],
                }
            )

    return {
        "content": content,
        "model": data["model"],
        "prompt_tokens": data["usage"]["input_tokens"],
        "completion_tokens": data["usage"]["output_tokens"],
        "tool_calls": tool_calls,
        "stop_reason": data.get("stop_reason"),
        # OTel GenAI semantic convention metadata
        "gen_ai_system": "anthropic",
        "gen_ai_request_model": self.model,
        "gen_ai_response_model": data["model"],
        "gen_ai_request_temperature": temperature,
        "gen_ai_request_max_tokens": max_tokens,
    }

OllamaBackend

OllamaBackend(model: str = 'llama3.2:3b', base_url: str = 'http://ollama:11434')

Bases: LLMBackend

Local models via Ollama HTTP API.

Default base_url points to K8s service name "ollama". For local dev, override with http://localhost:11434 (set OLLAMA_URL env var).

Note: Ollama's token counts (prompt_eval_count, eval_count) may be absent for some models; we default to 0 in that case.

Source code in src/loom/worker/backends.py
def __init__(self, model: str = "llama3.2:3b", base_url: str = "http://ollama:11434") -> None:
    self.model = model
    self.client = httpx.AsyncClient(base_url=base_url, timeout=120.0)

complete async

complete(system_prompt: str, user_message: str, max_tokens: int = 2000, temperature: float = 0.0, *, tools: list[dict[str, Any]] | None = None, messages: list[dict[str, Any]] | None = None) -> dict[str, Any]

Complete an LLM request via the Ollama HTTP API.

Source code in src/loom/worker/backends.py
async def complete(
    self,
    system_prompt: str,
    user_message: str,
    max_tokens: int = 2000,
    temperature: float = 0.0,
    *,
    tools: list[dict[str, Any]] | None = None,
    messages: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
    """Complete an LLM request via the Ollama HTTP API."""
    # Build messages array
    if messages is not None:
        api_messages = [
            {"role": "system", "content": system_prompt},
            *_ollama_messages(messages),
        ]
    else:
        api_messages = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_message},
        ]

    body: dict[str, Any] = {
        "model": self.model,
        "messages": api_messages,
        "stream": False,
        "options": {"temperature": temperature, "num_predict": max_tokens},
    }

    # Add tool definitions if provided (OpenAI-compatible format)
    if tools:
        body["tools"] = [
            {
                "type": "function",
                "function": {
                    "name": t["name"],
                    "description": t.get("description", ""),
                    "parameters": t.get("parameters", {"type": "object"}),
                },
            }
            for t in tools
        ]

    resp = await self.client.post("/api/chat", json=body)
    resp.raise_for_status()
    data = resp.json()

    # Parse tool calls from Ollama response
    message = data.get("message", {})
    content = message.get("content") or None
    tool_calls = None

    raw_calls = message.get("tool_calls")
    if raw_calls:
        tool_calls = []
        for i, call in enumerate(raw_calls):
            func = call.get("function", {})
            tool_calls.append(
                {
                    "id": f"call_{i}",
                    "name": func.get("name", ""),
                    "arguments": func.get("arguments", {}),
                }
            )

    stop_reason = "tool_use" if tool_calls else "end_turn"

    return {
        "content": content,
        "model": self.model,
        "prompt_tokens": data.get("prompt_eval_count", 0),
        "completion_tokens": data.get("eval_count", 0),
        "tool_calls": tool_calls,
        "stop_reason": stop_reason,
        # OTel GenAI semantic convention metadata
        "gen_ai_system": "ollama",
        "gen_ai_request_model": self.model,
        "gen_ai_response_model": self.model,
        "gen_ai_request_temperature": temperature,
        "gen_ai_request_max_tokens": max_tokens,
    }

OpenAICompatibleBackend

OpenAICompatibleBackend(base_url: str, api_key: str = 'not-needed', model: str = 'default')

Bases: LLMBackend

Any OpenAI-compatible API (vLLM, llama.cpp server, LiteLLM, etc.).

Source code in src/loom/worker/backends.py
def __init__(self, base_url: str, api_key: str = "not-needed", model: str = "default") -> None:
    self.model = model
    self.client = httpx.AsyncClient(
        base_url=base_url,
        headers={"Authorization": f"Bearer {api_key}"},
        timeout=120.0,
    )

complete async

complete(system_prompt: str, user_message: str, max_tokens: int = 2000, temperature: float = 0.0, *, tools: list[dict[str, Any]] | None = None, messages: list[dict[str, Any]] | None = None) -> dict[str, Any]

Complete an LLM request via an OpenAI-compatible API.

Source code in src/loom/worker/backends.py
async def complete(
    self,
    system_prompt: str,
    user_message: str,
    max_tokens: int = 2000,
    temperature: float = 0.0,
    *,
    tools: list[dict[str, Any]] | None = None,
    messages: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
    """Complete an LLM request via an OpenAI-compatible API."""
    # Build messages array
    if messages is not None:
        api_messages = [
            {"role": "system", "content": system_prompt},
            *_openai_messages(messages),
        ]
    else:
        api_messages = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_message},
        ]

    body: dict[str, Any] = {
        "model": self.model,
        "messages": api_messages,
        "max_tokens": max_tokens,
        "temperature": temperature,
    }

    # Add tool definitions if provided
    if tools:
        body["tools"] = [
            {
                "type": "function",
                "function": {
                    "name": t["name"],
                    "description": t.get("description", ""),
                    "parameters": t.get("parameters", {"type": "object"}),
                },
            }
            for t in tools
        ]

    resp = await self.client.post("/v1/chat/completions", json=body)
    resp.raise_for_status()
    data = resp.json()
    usage = data.get("usage", {})

    # Parse response
    choice = data["choices"][0]
    message = choice.get("message", {})
    content = message.get("content")
    tool_calls = None

    raw_calls = message.get("tool_calls")
    if raw_calls:
        tool_calls = []
        for call in raw_calls:
            func = call.get("function", {})
            args = func.get("arguments", "{}")
            if isinstance(args, str):
                try:
                    args = json.loads(args)
                except json.JSONDecodeError:
                    args = {"_raw": args}
            tool_calls.append(
                {
                    "id": call.get("id", ""),
                    "name": func.get("name", ""),
                    "arguments": args,
                }
            )

    finish_reason = choice.get("finish_reason", "stop")
    stop_reason = "tool_use" if finish_reason == "tool_calls" else "end_turn"

    return {
        "content": content,
        "model": data.get("model", self.model),
        "prompt_tokens": usage.get("prompt_tokens", 0),
        "completion_tokens": usage.get("completion_tokens", 0),
        "tool_calls": tool_calls,
        "stop_reason": stop_reason,
        # OTel GenAI semantic convention metadata
        "gen_ai_system": "openai",
        "gen_ai_request_model": self.model,
        "gen_ai_response_model": data.get("model", self.model),
        "gen_ai_request_temperature": temperature,
        "gen_ai_request_max_tokens": max_tokens,
    }

build_backends_from_env

build_backends_from_env() -> dict[str, LLMBackend]

Build LLM backends from environment variables and ~/.loom/config.yaml.

Resolution priority: env vars > config.yaml > built-in defaults.

Resolves available backends based on which env vars are set:

  • OLLAMA_URL → OllamaBackend for the local tier
  • OLLAMA_MODEL → Override Ollama model (default: llama3.2:3b)
  • ANTHROPIC_API_KEY → AnthropicBackend for standard + frontier
  • FRONTIER_MODEL → Override frontier model (default: claude-opus-4-20250514)

Returns:

Type Description
dict[str, LLMBackend]

Dict mapping tier name → LLMBackend instance. May be empty if no

dict[str, LLMBackend]

environment variables are set.

Source code in src/loom/worker/backends.py
def build_backends_from_env() -> dict[str, LLMBackend]:
    """Build LLM backends from environment variables and ``~/.loom/config.yaml``.

    Resolution priority: env vars > config.yaml > built-in defaults.

    Resolves available backends based on which env vars are set:

    - ``OLLAMA_URL`` → OllamaBackend for the ``local`` tier
    - ``OLLAMA_MODEL`` → Override Ollama model (default: ``llama3.2:3b``)
    - ``ANTHROPIC_API_KEY`` → AnthropicBackend for ``standard`` + ``frontier``
    - ``FRONTIER_MODEL`` → Override frontier model (default: ``claude-opus-4-20250514``)

    Returns:
        Dict mapping tier name → LLMBackend instance. May be empty if no
        environment variables are set.
    """
    # Load config.yaml defaults (best-effort; env vars still override)
    try:
        from loom.cli.config import apply_config_to_env, load_config

        config = load_config()
        apply_config_to_env(config)
    except Exception:
        pass

    backends: dict[str, LLMBackend] = {}

    if os.getenv("OLLAMA_URL"):
        ollama_model = os.getenv("OLLAMA_MODEL", "llama3.2:3b")
        backends["local"] = OllamaBackend(model=ollama_model, base_url=os.getenv("OLLAMA_URL"))

    if os.getenv("ANTHROPIC_API_KEY"):
        backends["standard"] = AnthropicBackend(api_key=os.getenv("ANTHROPIC_API_KEY"))
        backends["frontier"] = AnthropicBackend(
            api_key=os.getenv("ANTHROPIC_API_KEY"),
            model=os.getenv("FRONTIER_MODEL", "claude-opus-4-20250514"),
        )

    return backends

Processor

ProcessorWorker and SyncProcessingBackend ABC for non-LLM workers. Includes serialize_writes option and BackendError hierarchy.

processor

Processor worker for non-LLM task processing.

ProcessorWorker delegates to a ProcessingBackend — any Python library, rules engine, or external tool that isn't an LLM. Examples: Docling for document extraction, ffmpeg for media, scikit-learn for classification.

This module also provides:

BackendError
    Base exception for processing backend failures. Backend
    implementations should subclass this (e.g., DoclingConversionError)
    to provide structured errors with the original cause preserved.

SyncProcessingBackend
    Base class for backends wrapping synchronous, CPU-bound libraries.
    Subclasses implement ``process_sync()`` which is automatically
    offloaded to a thread pool via ``asyncio.run_in_executor``.

BackendError

Bases: Exception

Base error for processing backend failures.

Backend implementations should raise subclasses of this to provide structured, domain-specific errors with the original cause preserved via __cause__.

Example::

class DoclingConversionError(BackendError):
    """Raised when Docling fails to convert a document."""

try:
    converter.convert(path)
except Exception as exc:
    raise DoclingConversionError(f"Failed: {exc}") from exc

ProcessingBackend

Bases: ABC

Generic processing backend interface for non-LLM workers.

Implementations wrap a specific tool or library (Docling, ffmpeg, etc.) and translate between Loom's payload/output dicts and that tool's API.

process abstractmethod async

process(payload: dict[str, Any], config: dict[str, Any]) -> dict[str, Any]

Process a task payload.

Parameters:

Name Type Description Default
payload dict[str, Any]

Validated input dict from TaskMessage.

required
config dict[str, Any]

Full worker config dict (for backend-specific settings).

required

Returns:

Type Description
dict[str, Any]

A dict with the following structure::

{ "output": dict, # Structured output matching output_schema "model_used": str | None, # Identifier (e.g., "docling-v2", "ffmpeg-6.1") }

Source code in src/loom/worker/processor.py
@abstractmethod
async def process(self, payload: dict[str, Any], config: dict[str, Any]) -> dict[str, Any]:
    """Process a task payload.

    Args:
        payload: Validated input dict from TaskMessage.
        config: Full worker config dict (for backend-specific settings).

    Returns:
        A dict with the following structure::

            {
                "output": dict,           # Structured output matching output_schema
                "model_used": str | None, # Identifier (e.g., "docling-v2", "ffmpeg-6.1")
            }
    """
    ...

SyncProcessingBackend

SyncProcessingBackend(*, serialize_writes: bool = False)

Bases: ProcessingBackend

Base class for backends wrapping synchronous, CPU-bound libraries.

Subclasses implement process_sync() instead of process(). The synchronous method is automatically offloaded to a thread pool via asyncio.run_in_executor so the async event loop stays responsive.

If serialize_writes=True, an asyncio.Lock ensures only one call to process_sync runs at a time. Use this for backends that write to single-writer stores like DuckDB.

Use this for backends that wrap libraries like Docling, ffmpeg, scikit-learn, or any other tool that performs blocking I/O or CPU-intensive computation.

Example::

class FFmpegBackend(SyncProcessingBackend):
    def process_sync(self, payload, config):
        # CPU-bound work — runs in thread pool automatically
        subprocess.run(["ffmpeg", ...])
        return {"output": {...}, "model_used": "ffmpeg"}
Source code in src/loom/worker/processor.py
def __init__(self, *, serialize_writes: bool = False) -> None:
    self._write_lock: asyncio.Lock | None = asyncio.Lock() if serialize_writes else None

process_sync abstractmethod

process_sync(payload: dict[str, Any], config: dict[str, Any]) -> dict[str, Any]

Process a task payload synchronously.

This method runs in a thread pool — do not use await here. Return format is identical to ProcessingBackend.process().

Parameters:

Name Type Description Default
payload dict[str, Any]

Validated input dict from TaskMessage.

required
config dict[str, Any]

Full worker config dict (for backend-specific settings).

required

Returns:

Type Description
dict[str, Any]

{"output": dict, "model_used": str | None}

Source code in src/loom/worker/processor.py
@abstractmethod
def process_sync(self, payload: dict[str, Any], config: dict[str, Any]) -> dict[str, Any]:
    """Process a task payload synchronously.

    This method runs in a thread pool — do not use ``await`` here.
    Return format is identical to ``ProcessingBackend.process()``.

    Args:
        payload: Validated input dict from TaskMessage.
        config: Full worker config dict (for backend-specific settings).

    Returns:
        ``{"output": dict, "model_used": str | None}``
    """
    ...

process async

process(payload: dict[str, Any], config: dict[str, Any]) -> dict[str, Any]

Offload process_sync() to a thread pool and return the result.

If serialize_writes was set, acquires the write lock first to ensure single-writer semantics (e.g., for DuckDB).

Source code in src/loom/worker/processor.py
async def process(self, payload: dict[str, Any], config: dict[str, Any]) -> dict[str, Any]:
    """Offload process_sync() to a thread pool and return the result.

    If ``serialize_writes`` was set, acquires the write lock first to
    ensure single-writer semantics (e.g., for DuckDB).
    """
    loop = asyncio.get_running_loop()
    if self._write_lock is not None:
        async with self._write_lock:
            return await loop.run_in_executor(None, self.process_sync, payload, config)
    return await loop.run_in_executor(None, self.process_sync, payload, config)

ProcessorWorker

ProcessorWorker(actor_id: str, config_path: str, backend: ProcessingBackend, nats_url: str = 'nats://nats:4222')

Bases: TaskWorker

Non-LLM stateless worker.

Delegates processing to a ProcessingBackend instead of an LLM. Follows the same lifecycle as LLMWorker: validate input, process, validate output, publish result.

Source code in src/loom/worker/processor.py
def __init__(
    self,
    actor_id: str,
    config_path: str,
    backend: ProcessingBackend,
    nats_url: str = "nats://nats:4222",
) -> None:
    super().__init__(actor_id, config_path, nats_url)
    self.backend = backend

process async

process(payload: dict[str, Any], metadata: dict[str, Any]) -> dict[str, Any]

Delegate processing to the backend and return the result.

Source code in src/loom/worker/processor.py
async def process(self, payload: dict[str, Any], metadata: dict[str, Any]) -> dict[str, Any]:
    """Delegate processing to the backend and return the result."""
    logger.info("processor.processing", backend=type(self.backend).__name__)
    result = await self.backend.process(payload, self.config)
    return {
        "output": result["output"],
        "model_used": result.get("model_used"),
        "token_usage": {},
    }

Tools

ToolProvider ABC and SyncToolProvider for LLM function-calling tools. Workers can expose tools that the LLM calls during processing (max 10 rounds).

tools

Tool provider abstraction for LLM function-calling.

Workers can offer tools to LLMs via their config's knowledge_silos key. Each tool-type silo specifies a provider class path (fully qualified, like processing_backend) and a config dict passed to the constructor.

Tool providers define what the LLM can call (via get_definition()) and execute the call when the LLM invokes it (via execute()).

Example config::

knowledge_silos:
  - name: "document_catalog"
    type: "tool"
    provider: "docman.tools.duckdb_view.DuckDBViewTool"
    config:
      db_path: "/tmp/docman-workspace/docman.duckdb"
      view_name: "document_summaries"

ToolProvider

Bases: ABC

A tool that can be offered to an LLM for function-calling.

Subclasses define the tool's JSON Schema definition and implement the execution logic. The LLMWorker manages the multi-turn loop: it passes tool definitions to the backend, receives tool_calls, dispatches to the appropriate provider, and feeds results back.

get_definition abstractmethod

get_definition() -> dict[str, Any]

Return the tool definition in standard JSON Schema format.

The returned dict must contain
  • name: Tool name (alphanumeric + underscores)
  • description: What the tool does (shown to LLM)
  • parameters: JSON Schema object for the tool's arguments

Example::

{
    "name": "search_documents",
    "description": "Full-text search over document catalog",
    "parameters": {
        "type": "object",
        "properties": {
            "query": {"type": "string"},
            "limit": {"type": "integer", "default": 10},
        },
        "required": ["query"],
    },
}
Source code in src/loom/worker/tools.py
@abstractmethod
def get_definition(self) -> dict[str, Any]:
    """Return the tool definition in standard JSON Schema format.

    The returned dict must contain:
        - ``name``: Tool name (alphanumeric + underscores)
        - ``description``: What the tool does (shown to LLM)
        - ``parameters``: JSON Schema object for the tool's arguments

    Example::

        {
            "name": "search_documents",
            "description": "Full-text search over document catalog",
            "parameters": {
                "type": "object",
                "properties": {
                    "query": {"type": "string"},
                    "limit": {"type": "integer", "default": 10},
                },
                "required": ["query"],
            },
        }
    """
    ...

execute abstractmethod async

execute(arguments: dict[str, Any]) -> str

Execute the tool with LLM-provided arguments.

Parameters:

Name Type Description Default
arguments dict[str, Any]

Parsed arguments matching the tool's parameters schema.

required

Returns:

Type Description
str

Result as a string (typically JSON). This is sent back to the LLM

str

as the tool result in the next turn.

Source code in src/loom/worker/tools.py
@abstractmethod
async def execute(self, arguments: dict[str, Any]) -> str:
    """Execute the tool with LLM-provided arguments.

    Args:
        arguments: Parsed arguments matching the tool's parameters schema.

    Returns:
        Result as a string (typically JSON). This is sent back to the LLM
        as the tool result in the next turn.
    """
    ...

SyncToolProvider

Bases: ToolProvider

Convenience base for synchronous tool implementations.

Subclasses implement execute_sync() which is automatically offloaded to a thread pool. Use this for tools that wrap synchronous libraries (e.g., DuckDB queries, file I/O).

execute_sync abstractmethod

execute_sync(arguments: dict[str, Any]) -> str

Execute the tool synchronously. Runs in a thread pool.

Source code in src/loom/worker/tools.py
@abstractmethod
def execute_sync(self, arguments: dict[str, Any]) -> str:
    """Execute the tool synchronously. Runs in a thread pool."""
    ...

execute async

execute(arguments: dict[str, Any]) -> str

Offload execute_sync() to a thread pool and return the result.

Source code in src/loom/worker/tools.py
async def execute(self, arguments: dict[str, Any]) -> str:
    """Offload execute_sync() to a thread pool and return the result."""
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(None, self.execute_sync, arguments)

load_tool_provider

load_tool_provider(class_path: str, config: dict[str, Any]) -> ToolProvider

Import and instantiate a ToolProvider by fully qualified class path.

Follows the same dynamic-import pattern as _load_processing_backend in cli/main.py.

Parameters:

Name Type Description Default
class_path str

Dotted path like docman.tools.duckdb_view.DuckDBViewTool.

required
config dict[str, Any]

Dict of keyword arguments passed to the constructor.

required

Returns:

Type Description
ToolProvider

An instantiated ToolProvider.

Raises:

Type Description
ImportError

If the module cannot be imported.

AttributeError

If the class is not found in the module.

TypeError

If the class is not a ToolProvider subclass.

Source code in src/loom/worker/tools.py
def load_tool_provider(class_path: str, config: dict[str, Any]) -> ToolProvider:
    """Import and instantiate a ToolProvider by fully qualified class path.

    Follows the same dynamic-import pattern as ``_load_processing_backend``
    in ``cli/main.py``.

    Args:
        class_path: Dotted path like ``docman.tools.duckdb_view.DuckDBViewTool``.
        config: Dict of keyword arguments passed to the constructor.

    Returns:
        An instantiated ToolProvider.

    Raises:
        ImportError: If the module cannot be imported.
        AttributeError: If the class is not found in the module.
        TypeError: If the class is not a ToolProvider subclass.
    """
    if "." not in class_path:
        raise ImportError(
            f"Tool provider '{class_path}' must be a fully qualified class path "
            f"(e.g., 'docman.tools.duckdb_view.DuckDBViewTool')"
        )

    module_path, class_name = class_path.rsplit(".", 1)
    module = importlib.import_module(module_path)

    tool_class = getattr(module, class_name, None)
    if tool_class is None:
        raise AttributeError(f"Tool class '{class_name}' not found in '{module_path}'")

    if not (isinstance(tool_class, type) and issubclass(tool_class, ToolProvider)):
        raise TypeError(f"'{class_path}' is not a ToolProvider subclass")

    return tool_class(**config)

Knowledge

Knowledge silo loading, injection into system prompts, and write-back. Supports read-only, read-write, and tool-based knowledge sources.

knowledge

Scoped knowledge/RAG loader for worker context injection.

Workers can have knowledge sources defined in their config YAML under a knowledge_sources key. This module loads those files and formats them for injection into the system prompt, giving workers domain-specific context.

Knowledge silos extend this with folder-based knowledge

knowledge_silos: - name: "classification_guides" type: "folder" path: "knowledge/classification/" permissions: "read" # "read" or "read_write"

Folder silos load all text files from a directory into the system prompt. Writable silos accept silo_updates from the LLM output to persist learned patterns (add/modify/delete files within the silo folder).

load_knowledge_sources

load_knowledge_sources(sources: list[dict[str, Any]]) -> str

Load knowledge sources and format them for system prompt injection.

Each source has: - path: file path to the knowledge file - inject_as: "reference" (append to prompt) or "few_shot" (format as examples)

Source code in src/loom/worker/knowledge.py
def load_knowledge_sources(sources: list[dict[str, Any]]) -> str:
    """
    Load knowledge sources and format them for system prompt injection.

    Each source has:
    - path: file path to the knowledge file
    - inject_as: "reference" (append to prompt) or "few_shot" (format as examples)
    """
    sections = []

    for source in sources:
        path = Path(source["path"])
        inject_as = source.get("inject_as", "reference")

        if not path.exists():
            logger.warning(
                "knowledge.source_not_found",
                path=str(path),
                inject_as=inject_as,
            )
            continue

        content = path.read_text()

        if inject_as == "reference":
            sections.append(f"\n--- Reference: {path.name} ---\n{content}")
        elif inject_as == "few_shot":
            sections.append(_format_few_shot(content, path.suffix))

    return "\n".join(sections)

load_knowledge_silos

load_knowledge_silos(silos: list[dict[str, Any]]) -> str

Load folder-type silos and return formatted content for system prompt.

Only processes silos with type="folder". Tool-type silos are handled separately by the runner's _load_tool_providers().

Returns:

Type Description
str

Concatenated content from all folder silos, with section headers.

str

Empty string if no folder silos or no content found.

Source code in src/loom/worker/knowledge.py
def load_knowledge_silos(silos: list[dict[str, Any]]) -> str:
    """Load folder-type silos and return formatted content for system prompt.

    Only processes silos with ``type="folder"``. Tool-type silos are handled
    separately by the runner's ``_load_tool_providers()``.

    Returns:
        Concatenated content from all folder silos, with section headers.
        Empty string if no folder silos or no content found.
    """
    sections: list[str] = []

    for silo in silos:
        if silo.get("type") != "folder":
            continue

        name = silo.get("name", "unnamed")
        path = Path(silo["path"])

        if not path.is_dir():
            logger.warning("knowledge.silo_folder_not_found", path=str(path), silo=name)
            continue

        content = _load_folder_contents(path)
        if content:
            sections.append(f"--- Knowledge Silo: {name} ---\n{content}")

    return "\n\n".join(sections)

apply_silo_updates

apply_silo_updates(updates: list[dict[str, Any]], silos: list[dict[str, Any]]) -> None

Apply LLM-requested file modifications to writable folder silos.

Each update dict has
  • silo: Name of the target silo
  • action: "add" | "modify" | "delete"
  • filename: Target filename within the silo folder
  • content: File content (for add/modify actions)
Validates
  • Target silo exists and has permissions="read_write"
  • Filename has no path traversal (../)
  • Action is one of the allowed values
Source code in src/loom/worker/knowledge.py
def apply_silo_updates(
    updates: list[dict[str, Any]],
    silos: list[dict[str, Any]],
) -> None:
    """Apply LLM-requested file modifications to writable folder silos.

    Each update dict has:
        - ``silo``: Name of the target silo
        - ``action``: ``"add"`` | ``"modify"`` | ``"delete"``
        - ``filename``: Target filename within the silo folder
        - ``content``: File content (for add/modify actions)

    Validates:
        - Target silo exists and has ``permissions="read_write"``
        - Filename has no path traversal (``../``)
        - Action is one of the allowed values
    """
    # Build lookup of writable folder silos
    writable: dict[str, Path] = {}
    for silo in silos:
        if silo.get("type") == "folder" and silo.get("permissions") == "read_write":
            writable[silo["name"]] = Path(silo["path"])

    for update in updates:
        silo_name = update.get("silo", "")
        action = update.get("action", "")
        filename = update.get("filename", "")
        content = update.get("content", "")

        # Validate silo is writable
        if silo_name not in writable:
            logger.warning(
                "knowledge.silo_update_denied",
                silo=silo_name,
                reason="not writable or not found",
            )
            continue

        # Validate filename — no path traversal
        if ".." in filename or filename.startswith("/"):
            logger.warning(
                "knowledge.silo_update_denied",
                silo=silo_name,
                filename=filename,
                reason="path traversal",
            )
            continue

        folder = writable[silo_name]
        target = folder / filename

        # Ensure resolved path is within the silo folder
        try:
            target.resolve().relative_to(folder.resolve())
        except ValueError:
            logger.warning(
                "knowledge.silo_update_denied",
                silo=silo_name,
                filename=filename,
                reason="path escapes silo",
            )
            continue

        if action == "add":
            target.parent.mkdir(parents=True, exist_ok=True)
            target.write_text(content, encoding="utf-8")
            logger.info("knowledge.silo_file_added", silo=silo_name, file=filename)

        elif action == "modify":
            if not target.exists():
                logger.warning(
                    "knowledge.silo_update_skipped",
                    silo=silo_name,
                    filename=filename,
                    reason="file not found for modify",
                )
                continue
            target.write_text(content, encoding="utf-8")
            logger.info("knowledge.silo_file_modified", silo=silo_name, file=filename)

        elif action == "delete":
            if target.exists():
                target.unlink()
                logger.info("knowledge.silo_file_deleted", silo=silo_name, file=filename)

        else:
            logger.warning(
                "knowledge.silo_update_skipped",
                silo=silo_name,
                action=action,
                reason="unknown action",
            )

Embeddings

EmbeddingProvider ABC and OllamaEmbeddingProvider for text embedding generation via Ollama's /api/embed endpoint.

embeddings

Embedding provider abstraction for vector generation.

Workers and tools can generate vector embeddings from text via an EmbeddingProvider. The default implementation uses Ollama's /api/embed endpoint with models like nomic-embed-text.

Example usage::

provider = OllamaEmbeddingProvider(model="nomic-embed-text")
vector = await provider.embed("some text to embed")
vectors = await provider.embed_batch(["text 1", "text 2"])

EmbeddingProvider

Bases: ABC

Common interface for generating vector embeddings from text.

dimensions abstractmethod property

dimensions: int

Return the dimensionality of embeddings produced by this provider.

embed abstractmethod async

embed(text: str) -> list[float]

Return embedding vector for the given text.

Source code in src/loom/worker/embeddings.py
@abstractmethod
async def embed(self, text: str) -> list[float]:
    """Return embedding vector for the given text."""
    ...

embed_batch abstractmethod async

embed_batch(texts: list[str]) -> list[list[float]]

Return embedding vectors for a batch of texts.

Source code in src/loom/worker/embeddings.py
@abstractmethod
async def embed_batch(self, texts: list[str]) -> list[list[float]]:
    """Return embedding vectors for a batch of texts."""
    ...

OllamaEmbeddingProvider

OllamaEmbeddingProvider(model: str = 'nomic-embed-text', base_url: str | None = None)

Bases: EmbeddingProvider

Generate embeddings via Ollama's /api/embed endpoint.

Uses the Ollama embedding API which supports both single and batch embedding generation. Dimensions are detected lazily from the first embedding call and cached.

Parameters:

Name Type Description Default
model str

Embedding model name (default: "nomic-embed-text").

'nomic-embed-text'
base_url str | None

Ollama server URL. Falls back to OLLAMA_URL env var, then "http://localhost:11434".

None
Source code in src/loom/worker/embeddings.py
def __init__(
    self,
    model: str = "nomic-embed-text",
    base_url: str | None = None,
) -> None:
    self.model = model
    self.base_url = base_url or os.environ.get("OLLAMA_URL") or "http://localhost:11434"
    self._dimensions: int | None = None
    self._client = httpx.AsyncClient(base_url=self.base_url, timeout=120.0)

dimensions property

dimensions: int

Return embedding dimensionality (detected from first call).

embed async

embed(text: str) -> list[float]

Generate embedding for a single text string.

Source code in src/loom/worker/embeddings.py
async def embed(self, text: str) -> list[float]:
    """Generate embedding for a single text string."""
    resp = await self._client.post(
        "/api/embed",
        json={"model": self.model, "input": text},
    )
    resp.raise_for_status()
    data = resp.json()
    embedding = data["embeddings"][0]

    # Cache dimensions from first call
    if self._dimensions is None:
        self._dimensions = len(embedding)

    return embedding

embed_batch async

embed_batch(texts: list[str]) -> list[list[float]]

Generate embeddings for multiple texts in one call.

Ollama's /api/embed supports batch input via the input field accepting a list of strings.

Source code in src/loom/worker/embeddings.py
async def embed_batch(self, texts: list[str]) -> list[list[float]]:
    """Generate embeddings for multiple texts in one call.

    Ollama's /api/embed supports batch input via the ``input`` field
    accepting a list of strings.
    """
    if not texts:
        return []

    resp = await self._client.post(
        "/api/embed",
        json={"model": self.model, "input": texts},
    )
    resp.raise_for_status()
    data = resp.json()
    embeddings = data["embeddings"]

    if self._dimensions is None and embeddings:
        self._dimensions = len(embeddings[0])

    return embeddings