Worker¶
The loom.worker package implements the two types of Loom workers:
- LLM Workers (
runner.py) — call language models with system prompts, tool-use loops, and JSON parsing. Used for summarization, classification, extraction, and analysis tasks. - Processor Workers (
processor.py) — run arbitrary Python code (no LLM). Used for data transformation, ingestion, and integration tasks.
Both types are stateless: they process one task, return a result, and reset.
See Building Workflows for the user-facing guide.
Base¶
Abstract base class for all workers (TaskWorker).
base ¶
TaskWorker base class.
Extracts the reusable worker lifecycle from the LLM-specific worker: message parsing, I/O contract validation, result publishing, timing, and error handling. Subclasses implement process() to do the actual work.
TaskWorker ¶
Bases: BaseActor
Generic stateless worker base.
Lifecycle per message: 1. Receive TaskMessage 2. Validate input against worker contract 3. Delegate to process() (subclass implements) 4. Validate output against worker contract 5. Publish TaskResult 6. Reset (no state retained)
Source code in src/loom/worker/base.py
on_reload
async
¶
Re-read the worker config from disk on reload signal.
handle_message
async
¶
Handle an incoming task message through the full worker lifecycle.
Source code in src/loom/worker/base.py
process
abstractmethod
async
¶
Process a task payload. Subclasses implement this.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
payload
|
dict[str, Any]
|
Validated input dict (matches input_schema). |
required |
metadata
|
dict[str, Any]
|
Task metadata dict (routing hints, pipeline context, etc.). |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
A dict with the following structure:: { "output": dict, # Must match output_schema "model_used": str | None, # Identifier for what processed this "token_usage": dict | None, # {"prompt_tokens": int, ...} or empty } |
Source code in src/loom/worker/base.py
reset
async
¶
Post-task cleanup hook for subclasses.
Called after every task (success or failure) to release temporary resources (file handles, caches, scratch buffers). The default implementation is a no-op. Override in subclasses that allocate per-task resources in process().
This is the enforcement point for the statelessness invariant: any state set during process() must be cleared here.
Source code in src/loom/worker/base.py
Runner¶
LLMWorker — the main LLM worker actor. Includes execute_with_tools(),
the standalone tool-use loop shared with the Workshop test bench.
runner ¶
LLM worker actor.
Processes a single task via an LLM backend and resets. No state carries between tasks — this is enforced, not optional.
Supports tool-use: when knowledge_silos include tool-type entries, the worker offers those tools to the LLM and executes a multi-turn loop until the LLM produces a final text answer.
LLMWorker ¶
LLMWorker(actor_id: str, config_path: str, backends: dict[str, LLMBackend], nats_url: str = 'nats://nats:4222')
Bases: TaskWorker
LLM-backed stateless worker.
Extends TaskWorker with LLM-specific logic: - Builds prompt from system_prompt + JSON payload - Loads knowledge silos (folder content → system prompt, tools → function-calling) - Calls the appropriate LLM backend by model tier - Executes multi-turn tool-use loop when tools are available - Parses JSON output from the LLM response (with fence-stripping fallback) - Applies silo_updates for writable folder silos
Source code in src/loom/worker/runner.py
process
async
¶
Build prompt, call LLM with tool-use loop, and parse structured output.
Source code in src/loom/worker/runner.py
265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 | |
execute_with_tools
async
¶
execute_with_tools(backend: LLMBackend, system_prompt: str, user_message: str, tool_providers: dict[str, ToolProvider], tool_defs: list[dict[str, Any]] | None, max_tokens: int = 2000) -> dict[str, Any]
Execute an LLM call with multi-round tool-use loop.
This function encapsulates the core LLM interaction pattern used by both
LLMWorker.process() and WorkerTestRunner.run(). It handles:
- Initial LLM call with optional tool definitions
- Multi-turn tool execution loop (up to
MAX_TOOL_ROUNDS) - Token count aggregation across rounds
- Error handling for unknown tools and tool execution failures
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
backend
|
LLMBackend
|
LLM backend to call (Anthropic, Ollama, OpenAI-compatible). |
required |
system_prompt
|
str
|
Full system prompt (with knowledge silo content prepended). |
required |
user_message
|
str
|
User message (typically JSON payload). |
required |
tool_providers
|
dict[str, ToolProvider]
|
Map of tool name → ToolProvider for execution. |
required |
tool_defs
|
list[dict[str, Any]] | None
|
Tool definitions list for the LLM (or None for no tools). |
required |
max_tokens
|
int
|
Maximum output tokens per LLM call. |
2000
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dict with keys: content, model, prompt_tokens, completion_tokens, |
dict[str, Any]
|
tool_calls, stop_reason. Token counts are aggregated across all |
dict[str, Any]
|
tool-use rounds. |
Source code in src/loom/worker/runner.py
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 | |
Backends¶
LLM backend implementations: AnthropicBackend, OllamaBackend,
OpenAICompatibleBackend. Plus build_backends_from_env() for automatic
backend detection from environment variables and ~/.loom/config.yaml.
backends ¶
LLM backend adapters — uniform interface for local and API models.
Each backend wraps a specific LLM provider's API and normalizes the response into a consistent dict format. Workers never call APIs directly; they always go through a backend.
To add a new backend
- Subclass LLMBackend
- Implement complete() returning the standard response dict
- Register it in cli/main.py's worker command (backend resolution by tier)
All backends use httpx with a 120s timeout. Adjust if your models are slow.
Tool-use support
Backends accept optional tools and messages parameters. When
tools is provided, the LLM may return tool_calls instead of content.
When messages is provided, it replaces the single user_message for
multi-turn conversations (tool execution loop).
LLMBackend ¶
Bases: ABC
Common interface all model backends implement.
complete
abstractmethod
async
¶
complete(system_prompt: str, user_message: str, max_tokens: int = 2000, temperature: float = 0.0, *, tools: list[dict[str, Any]] | None = None, messages: list[dict[str, Any]] | None = None) -> dict[str, Any]
Complete an LLM request and return a normalized response dict.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
system_prompt
|
str
|
System instructions for the LLM. |
required |
user_message
|
str
|
User message (ignored when |
required |
max_tokens
|
int
|
Maximum tokens in the response. |
2000
|
temperature
|
float
|
Sampling temperature. |
0.0
|
tools
|
list[dict[str, Any]] | None
|
Optional list of tool definitions for function-calling. |
None
|
messages
|
list[dict[str, Any]] | None
|
Optional full message history for multi-turn. When
provided, overrides |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
A dict with the following structure:: { "content": str | None, # Text response (None if tool_calls) "model": str, # Model identifier "prompt_tokens": int, "completion_tokens": int, "tool_calls": list | None, # [{"id": str, "name": str, "arguments": dict}] "stop_reason": str | None, # "end_turn" | "tool_use" } |
Source code in src/loom/worker/backends.py
AnthropicBackend ¶
Bases: LLMBackend
Claude API via httpx (Messages API).
Uses the Anthropic Messages API directly via httpx rather than the anthropic Python SDK — this keeps dependencies minimal and avoids version coupling.
Source code in src/loom/worker/backends.py
complete
async
¶
complete(system_prompt: str, user_message: str, max_tokens: int = 2000, temperature: float = 0.0, *, tools: list[dict[str, Any]] | None = None, messages: list[dict[str, Any]] | None = None) -> dict[str, Any]
Complete an LLM request via the Anthropic Messages API.
Source code in src/loom/worker/backends.py
OllamaBackend ¶
Bases: LLMBackend
Local models via Ollama HTTP API.
Default base_url points to K8s service name "ollama". For local dev, override with http://localhost:11434 (set OLLAMA_URL env var).
Note: Ollama's token counts (prompt_eval_count, eval_count) may be absent for some models; we default to 0 in that case.
Source code in src/loom/worker/backends.py
complete
async
¶
complete(system_prompt: str, user_message: str, max_tokens: int = 2000, temperature: float = 0.0, *, tools: list[dict[str, Any]] | None = None, messages: list[dict[str, Any]] | None = None) -> dict[str, Any]
Complete an LLM request via the Ollama HTTP API.
Source code in src/loom/worker/backends.py
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 | |
OpenAICompatibleBackend ¶
Bases: LLMBackend
Any OpenAI-compatible API (vLLM, llama.cpp server, LiteLLM, etc.).
Source code in src/loom/worker/backends.py
complete
async
¶
complete(system_prompt: str, user_message: str, max_tokens: int = 2000, temperature: float = 0.0, *, tools: list[dict[str, Any]] | None = None, messages: list[dict[str, Any]] | None = None) -> dict[str, Any]
Complete an LLM request via an OpenAI-compatible API.
Source code in src/loom/worker/backends.py
278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 | |
build_backends_from_env ¶
Build LLM backends from environment variables and ~/.loom/config.yaml.
Resolution priority: env vars > config.yaml > built-in defaults.
Resolves available backends based on which env vars are set:
OLLAMA_URL→ OllamaBackend for thelocaltierOLLAMA_MODEL→ Override Ollama model (default:llama3.2:3b)ANTHROPIC_API_KEY→ AnthropicBackend forstandard+frontierFRONTIER_MODEL→ Override frontier model (default:claude-opus-4-20250514)
Returns:
| Type | Description |
|---|---|
dict[str, LLMBackend]
|
Dict mapping tier name → LLMBackend instance. May be empty if no |
dict[str, LLMBackend]
|
environment variables are set. |
Source code in src/loom/worker/backends.py
Processor¶
ProcessorWorker and SyncProcessingBackend ABC for non-LLM workers.
Includes serialize_writes option and BackendError hierarchy.
processor ¶
Processor worker for non-LLM task processing.
ProcessorWorker delegates to a ProcessingBackend — any Python library, rules engine, or external tool that isn't an LLM. Examples: Docling for document extraction, ffmpeg for media, scikit-learn for classification.
This module also provides:
BackendError
Base exception for processing backend failures. Backend
implementations should subclass this (e.g., DoclingConversionError)
to provide structured errors with the original cause preserved.
SyncProcessingBackend
Base class for backends wrapping synchronous, CPU-bound libraries.
Subclasses implement ``process_sync()`` which is automatically
offloaded to a thread pool via ``asyncio.run_in_executor``.
BackendError ¶
Bases: Exception
Base error for processing backend failures.
Backend implementations should raise subclasses of this to provide
structured, domain-specific errors with the original cause preserved
via __cause__.
Example::
class DoclingConversionError(BackendError):
"""Raised when Docling fails to convert a document."""
try:
converter.convert(path)
except Exception as exc:
raise DoclingConversionError(f"Failed: {exc}") from exc
ProcessingBackend ¶
Bases: ABC
Generic processing backend interface for non-LLM workers.
Implementations wrap a specific tool or library (Docling, ffmpeg, etc.) and translate between Loom's payload/output dicts and that tool's API.
process
abstractmethod
async
¶
Process a task payload.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
payload
|
dict[str, Any]
|
Validated input dict from TaskMessage. |
required |
config
|
dict[str, Any]
|
Full worker config dict (for backend-specific settings). |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
A dict with the following structure:: { "output": dict, # Structured output matching output_schema "model_used": str | None, # Identifier (e.g., "docling-v2", "ffmpeg-6.1") } |
Source code in src/loom/worker/processor.py
SyncProcessingBackend ¶
Bases: ProcessingBackend
Base class for backends wrapping synchronous, CPU-bound libraries.
Subclasses implement process_sync() instead of process().
The synchronous method is automatically offloaded to a thread pool
via asyncio.run_in_executor so the async event loop stays
responsive.
If serialize_writes=True, an asyncio.Lock ensures only one
call to process_sync runs at a time. Use this for backends
that write to single-writer stores like DuckDB.
Use this for backends that wrap libraries like Docling, ffmpeg, scikit-learn, or any other tool that performs blocking I/O or CPU-intensive computation.
Example::
class FFmpegBackend(SyncProcessingBackend):
def process_sync(self, payload, config):
# CPU-bound work — runs in thread pool automatically
subprocess.run(["ffmpeg", ...])
return {"output": {...}, "model_used": "ffmpeg"}
Source code in src/loom/worker/processor.py
process_sync
abstractmethod
¶
Process a task payload synchronously.
This method runs in a thread pool — do not use await here.
Return format is identical to ProcessingBackend.process().
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
payload
|
dict[str, Any]
|
Validated input dict from TaskMessage. |
required |
config
|
dict[str, Any]
|
Full worker config dict (for backend-specific settings). |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
|
Source code in src/loom/worker/processor.py
process
async
¶
Offload process_sync() to a thread pool and return the result.
If serialize_writes was set, acquires the write lock first to
ensure single-writer semantics (e.g., for DuckDB).
Source code in src/loom/worker/processor.py
ProcessorWorker ¶
ProcessorWorker(actor_id: str, config_path: str, backend: ProcessingBackend, nats_url: str = 'nats://nats:4222')
Bases: TaskWorker
Non-LLM stateless worker.
Delegates processing to a ProcessingBackend instead of an LLM. Follows the same lifecycle as LLMWorker: validate input, process, validate output, publish result.
Source code in src/loom/worker/processor.py
process
async
¶
Delegate processing to the backend and return the result.
Source code in src/loom/worker/processor.py
Tools¶
ToolProvider ABC and SyncToolProvider for LLM function-calling tools.
Workers can expose tools that the LLM calls during processing (max 10 rounds).
tools ¶
Tool provider abstraction for LLM function-calling.
Workers can offer tools to LLMs via their config's knowledge_silos key.
Each tool-type silo specifies a provider class path (fully qualified,
like processing_backend) and a config dict passed to the constructor.
Tool providers define what the LLM can call (via get_definition()) and
execute the call when the LLM invokes it (via execute()).
Example config::
knowledge_silos:
- name: "document_catalog"
type: "tool"
provider: "docman.tools.duckdb_view.DuckDBViewTool"
config:
db_path: "/tmp/docman-workspace/docman.duckdb"
view_name: "document_summaries"
ToolProvider ¶
Bases: ABC
A tool that can be offered to an LLM for function-calling.
Subclasses define the tool's JSON Schema definition and implement the execution logic. The LLMWorker manages the multi-turn loop: it passes tool definitions to the backend, receives tool_calls, dispatches to the appropriate provider, and feeds results back.
get_definition
abstractmethod
¶
Return the tool definition in standard JSON Schema format.
The returned dict must contain
name: Tool name (alphanumeric + underscores)description: What the tool does (shown to LLM)parameters: JSON Schema object for the tool's arguments
Example::
{
"name": "search_documents",
"description": "Full-text search over document catalog",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string"},
"limit": {"type": "integer", "default": 10},
},
"required": ["query"],
},
}
Source code in src/loom/worker/tools.py
execute
abstractmethod
async
¶
Execute the tool with LLM-provided arguments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
arguments
|
dict[str, Any]
|
Parsed arguments matching the tool's parameters schema. |
required |
Returns:
| Type | Description |
|---|---|
str
|
Result as a string (typically JSON). This is sent back to the LLM |
str
|
as the tool result in the next turn. |
Source code in src/loom/worker/tools.py
SyncToolProvider ¶
Bases: ToolProvider
Convenience base for synchronous tool implementations.
Subclasses implement execute_sync() which is automatically offloaded
to a thread pool. Use this for tools that wrap synchronous libraries
(e.g., DuckDB queries, file I/O).
load_tool_provider ¶
Import and instantiate a ToolProvider by fully qualified class path.
Follows the same dynamic-import pattern as _load_processing_backend
in cli/main.py.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
class_path
|
str
|
Dotted path like |
required |
config
|
dict[str, Any]
|
Dict of keyword arguments passed to the constructor. |
required |
Returns:
| Type | Description |
|---|---|
ToolProvider
|
An instantiated ToolProvider. |
Raises:
| Type | Description |
|---|---|
ImportError
|
If the module cannot be imported. |
AttributeError
|
If the class is not found in the module. |
TypeError
|
If the class is not a ToolProvider subclass. |
Source code in src/loom/worker/tools.py
Knowledge¶
Knowledge silo loading, injection into system prompts, and write-back. Supports read-only, read-write, and tool-based knowledge sources.
knowledge ¶
Scoped knowledge/RAG loader for worker context injection.
Workers can have knowledge sources defined in their config YAML under
a knowledge_sources key. This module loads those files and formats them
for injection into the system prompt, giving workers domain-specific context.
Knowledge silos extend this with folder-based knowledge
knowledge_silos: - name: "classification_guides" type: "folder" path: "knowledge/classification/" permissions: "read" # "read" or "read_write"
Folder silos load all text files from a directory into the system prompt.
Writable silos accept silo_updates from the LLM output to persist
learned patterns (add/modify/delete files within the silo folder).
load_knowledge_sources ¶
Load knowledge sources and format them for system prompt injection.
Each source has: - path: file path to the knowledge file - inject_as: "reference" (append to prompt) or "few_shot" (format as examples)
Source code in src/loom/worker/knowledge.py
load_knowledge_silos ¶
Load folder-type silos and return formatted content for system prompt.
Only processes silos with type="folder". Tool-type silos are handled
separately by the runner's _load_tool_providers().
Returns:
| Type | Description |
|---|---|
str
|
Concatenated content from all folder silos, with section headers. |
str
|
Empty string if no folder silos or no content found. |
Source code in src/loom/worker/knowledge.py
apply_silo_updates ¶
Apply LLM-requested file modifications to writable folder silos.
Each update dict has
silo: Name of the target siloaction:"add"|"modify"|"delete"filename: Target filename within the silo foldercontent: File content (for add/modify actions)
Validates
- Target silo exists and has
permissions="read_write" - Filename has no path traversal (
../) - Action is one of the allowed values
Source code in src/loom/worker/knowledge.py
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 | |
Embeddings¶
EmbeddingProvider ABC and OllamaEmbeddingProvider for text embedding
generation via Ollama's /api/embed endpoint.
embeddings ¶
Embedding provider abstraction for vector generation.
Workers and tools can generate vector embeddings from text via an EmbeddingProvider. The default implementation uses Ollama's /api/embed endpoint with models like nomic-embed-text.
Example usage::
provider = OllamaEmbeddingProvider(model="nomic-embed-text")
vector = await provider.embed("some text to embed")
vectors = await provider.embed_batch(["text 1", "text 2"])
EmbeddingProvider ¶
OllamaEmbeddingProvider ¶
Bases: EmbeddingProvider
Generate embeddings via Ollama's /api/embed endpoint.
Uses the Ollama embedding API which supports both single and batch embedding generation. Dimensions are detected lazily from the first embedding call and cached.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
model
|
str
|
Embedding model name (default: "nomic-embed-text"). |
'nomic-embed-text'
|
base_url
|
str | None
|
Ollama server URL. Falls back to OLLAMA_URL env var, then "http://localhost:11434". |
None
|
Source code in src/loom/worker/embeddings.py
embed
async
¶
Generate embedding for a single text string.
Source code in src/loom/worker/embeddings.py
embed_batch
async
¶
Generate embeddings for multiple texts in one call.
Ollama's /api/embed supports batch input via the input field
accepting a list of strings.