Core¶
The loom.core package contains the foundational abstractions that all Loom
components build on: actors, messages, configuration, I/O contracts, app
manifests, and workspace file management.
You rarely use these directly — they're used by workers, orchestrators, and the CLI. See Building Workflows for the user-facing guide.
Actor¶
The base class for all NATS-connected actors (workers, routers, orchestrators).
actor ¶
Base actor class — the foundation of Loom's actor model.
All Loom actors (workers, orchestrators, routers) inherit from BaseActor. This class handles the message bus subscription lifecycle, message dispatch, signal-based shutdown, and error isolation. Each actor is an independent process with no shared memory.
Design invariant: actors communicate ONLY through bus messages (see messages.py). Direct method calls between actors are forbidden.
The message bus is pluggable via the bus constructor parameter. The default
is NATSBus (created from nats_url when no bus is provided). For testing,
pass an InMemoryBus instead.
BaseActor ¶
BaseActor(actor_id: str, nats_url: str = 'nats://nats:4222', max_concurrent: int = 1, *, bus: MessageBus | None = None)
Bases: ABC
Actor model base class.
Each actor: - Subscribes to a message bus subject - Processes messages with configurable concurrency (default: 1 = strict ordering) - Communicates only through structured messages - Has isolated state (no shared memory) - Shuts down gracefully on SIGTERM/SIGINT
Concurrency can be configured via max_concurrent. Values > 1 allow parallel message processing within a single actor instance — use with care, as it relaxes ordering guarantees. Horizontal scaling via queue groups (multiple replicas) is the preferred way to increase throughput while preserving per-message isolation.
The message bus can be injected via the bus keyword argument. If omitted,
a NATSBus is created from nats_url (backward-compatible default).
Source code in src/loom/core/actor.py
connect
async
¶
disconnect
async
¶
Unsubscribe and close the message bus connection.
Source code in src/loom/core/actor.py
subscribe
async
¶
Subscribe to a bus subject.
Queue group enables competing consumers (multiple worker replicas share load).
Source code in src/loom/core/actor.py
publish
async
¶
on_reload
async
¶
Config reload hook — called when a control reload message arrives.
Subclasses override this to re-read their config from disk. The default implementation is a no-op.
run
async
¶
Main actor loop — subscribe, process messages, and handle shutdown.
This method blocks until a shutdown signal (SIGTERM/SIGINT) is received or the bus connection drops. Messages are processed with bounded concurrency controlled by max_concurrent (default 1 = strict ordering).
A background control listener subscribes to loom.control.reload
to support hot-reloading of actor configs without restart.
Graceful shutdown sequence: 1. Signal received -> _request_shutdown() sets the shutdown event 2. Message loop breaks after finishing in-flight messages 3. Control listener is cancelled 4. Actor disconnects from the bus (drains pending publishes)
Source code in src/loom/core/actor.py
handle_message
abstractmethod
async
¶
Messages¶
Typed Pydantic models for all inter-actor communication: TaskMessage,
TaskResult, OrchestratorGoal, CheckpointState, ModelTier, TaskStatus.
messages ¶
Loom message schemas — the canonical wire format.
All inter-actor communication is typed through these Pydantic models. Actors ONLY communicate through these message types; raw dicts or ad-hoc JSON are forbidden. This enforces a contract-driven architecture where every message is validatable at compile time.
See Also
loom.core.contracts — JSON Schema validation for payload/output dicts loom.bus.nats_adapter — NATS subject conventions for message routing
TaskPriority ¶
Bases: StrEnum
Priority levels for task scheduling (not yet enforced by router).
TaskStatus ¶
Bases: StrEnum
Lifecycle states for a task.
State transitions
PENDING -> PROCESSING -> COMPLETED -> FAILED -> RETRY -> PROCESSING (not yet implemented)
ModelTier ¶
Bases: StrEnum
Which model tier should handle this task.
Tiers map to backend instances configured at startup
LOCAL -> OllamaBackend (e.g., llama3.2:3b) STANDARD -> AnthropicBackend (e.g., Claude Sonnet) FRONTIER -> AnthropicBackend (e.g., Claude Opus)
The router may override the tier via tier_overrides in router_rules.yaml.
TaskMessage ¶
Bases: BaseModel
Message sent TO a worker actor.
The payload dict must conform to the worker's input_schema (JSON Schema). Contract validation happens in TaskWorker.handle_message(), not here.
TaskResult ¶
Bases: BaseModel
Message sent FROM a worker actor after processing.
Published to: loom.results.{parent_task_id or 'default'} The output dict must conform to the worker's output_schema (JSON Schema).
OrchestratorGoal ¶
Bases: BaseModel
Top-level goal submitted to an orchestrator.
Published to: loom.goals.incoming The orchestrator (PipelineOrchestrator or OrchestratorActor) picks this up, decomposes it into TaskMessages, and synthesizes results.
The context dict carries domain-specific data (e.g., file_ref for doc processing).
CheckpointState ¶
Bases: BaseModel
Compressed orchestrator state for self-summarization.
When the orchestrator's conversation history exceeds a token threshold, CheckpointManager compresses it into this structure and persists to Valkey. The orchestrator can then "reboot" with a fresh context containing only the checkpoint + a small recent-interactions window.
See: loom.orchestrator.checkpoint.CheckpointManager
Configuration¶
Config loading, validation, and schema-ref resolution. Validates worker, pipeline, orchestrator, and router YAML configs.
config ¶
Configuration loading and validation utilities.
All Loom configs are YAML files. This module validates them at load time so misconfigurations fail fast at startup — not at first-message time.
Validation functions return a list of error strings (empty = valid). They do NOT raise; callers decide how to handle errors (log, abort, collect).
Four config families are validated here:
- Worker configs — system prompt, I/O schemas, tier, timeout
- Pipeline configs — stage names, worker_types, input_mapping, conditions
- Orchestrator configs — name, system_prompt, checkpoint settings
- Router rules — tier_overrides, rate_limits
Scheduler and MCP configs have dedicated validators in their own modules.
See Also
configs/workers/_template.yaml — canonical worker config reference loom.scheduler.config — scheduler config validation loom.mcp.config — MCP gateway config validation
ConfigValidationError ¶
Bases: Exception
Raised when a config file fails structural validation.
load_config ¶
Load a YAML config file and return as a dict.
When resolve_refs is True (the default), input_schema_ref and
output_schema_ref fields are resolved to JSON Schema via Pydantic
model imports. Pass resolve_refs=False when the referenced modules
may not be importable (e.g., during config-only validation).
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If the config file doesn't exist. |
YAMLError
|
If the file contains invalid YAML. |
ConfigValidationError
|
If the file parses to a non-dict (e.g., a list or scalar). |
Source code in src/loom/core/config.py
resolve_schema_refs ¶
Resolve input_schema_ref / output_schema_ref to JSON Schema.
If a config dict contains input_schema_ref or output_schema_ref
(a dotted Python path to a Pydantic model), import the model and call
.model_json_schema() to populate the corresponding input_schema
/ output_schema key.
An explicit input_schema / output_schema key always takes
precedence — the ref is only used when the inline schema is absent.
This function mutates and returns config for convenience.
Raises:
| Type | Description |
|---|---|
ConfigValidationError
|
If the referenced path cannot be imported or the target is not a Pydantic BaseModel subclass. |
Source code in src/loom/core/config.py
validate_worker_config ¶
Validate a worker config dict.
Checks for:
- Required keys (name; system_prompt for LLM workers, processing_backend for processor workers)
- Correct types for all known keys
- Valid model tier values
- Valid JSON Schema structure for input_schema/output_schema
- Knowledge silos structural integrity
- Numeric bounds (timeouts, token limits)
Source code in src/loom/core/config.py
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 | |
validate_pipeline_config ¶
Validate a pipeline orchestrator config.
Checks for: - Required keys (name, pipeline_stages) - Each stage has name and worker_type - Stage names are unique - input_mapping values are valid dot-notation paths - depends_on references exist as stage names - condition syntax is valid (3-part: path op value) - Tier values are valid - No circular dependencies (basic check)
Source code in src/loom/core/config.py
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 | |
validate_orchestrator_config ¶
Validate a dynamic orchestrator (OrchestratorActor) config.
Checks for: - Required keys (name) - system_prompt presence (warned, not error — could be in config) - checkpoint settings structure - Numeric bounds (timeouts, concurrency) - available_workers structure
Source code in src/loom/core/config.py
validate_router_rules ¶
Validate router_rules.yaml.
Checks for: - Top-level must be a dict - tier_overrides values are valid ModelTier values - rate_limits have valid structure (max_concurrent > 0) - rate_limits keys are valid tier names
Source code in src/loom/core/config.py
Contracts¶
Input/output contract validation. Ensures messages match their declared JSON Schema, with correct bool/int distinction.
contracts ¶
Lightweight JSON Schema validation for I/O contracts.
We avoid the jsonschema dependency — this covers the 90% case (required fields + shallow type checks for object properties). This is intentionally simple: every worker has an input_schema and output_schema defined in its YAML config, and this module validates payloads against those schemas at message boundaries.
Limitations (by design — keeps the dependency tree minimal): - No nested object validation (only top-level properties) - No array item type validation - No min/max, pattern, enum, or format constraints - No $ref or schema composition (allOf, oneOf, anyOf)
If you need full Draft 2020-12 validation, add jsonschema to dependencies
and swap this module. The validate_input/validate_output API stays the same.
validate_input ¶
Validate a task's input payload against the worker's input_schema.
Returns an empty list if valid, or a list of human-readable error strings.
Source code in src/loom/core/contracts.py
validate_output ¶
Validate a worker's output against its output_schema.
Returns an empty list if valid, or a list of human-readable error strings.
Source code in src/loom/core/contracts.py
Manifest¶
App bundle manifest model (AppManifest). Used by the Workshop's app deployment
system for ZIP bundle validation.
manifest ¶
App manifest schema for Loom application bundles.
A Loom app is a ZIP archive containing worker/pipeline/scheduler configs,
optional scripts, and a manifest.yaml describing the app metadata and
entry points. Apps are deployed via the Workshop UI or CLI.
Manifest format::
name: "myapp"
version: "1.0.0"
description: "My Loom application"
loom_version: ">=0.4.0"
required_extras: [duckdb, mcp]
python_package: # optional — for apps with Python code
name: "myapp"
install_path: "src/"
entry_configs:
workers:
- config: "configs/workers/my_worker.yaml"
tier: "standard"
pipelines:
- config: "configs/orchestrators/my_pipeline.yaml"
schedulers:
- config: "configs/schedulers/my_schedule.yaml"
mcp:
- config: "configs/mcp/my_mcp.yaml"
scripts:
- path: "scripts/setup.py"
description: "Initial setup script"
PythonPackage ¶
Bases: BaseModel
Optional Python package included in the app bundle.
EntryConfigRef ¶
Bases: BaseModel
Reference to a config file within the app bundle.
EntryConfigs ¶
Bases: BaseModel
Entry point configurations that the app exposes.
ScriptRef ¶
Bases: BaseModel
Reference to a script included in the app bundle.
AppManifest ¶
Bases: BaseModel
Loom application manifest.
Describes the contents and metadata of a Loom app bundle (ZIP archive).
The manifest is stored as manifest.yaml at the root of the ZIP.
validate_name
classmethod
¶
App name must be a valid identifier (lowercase, hyphens, underscores).
Source code in src/loom/core/manifest.py
validate_version
classmethod
¶
Version must be semantic (major.minor.patch, optional pre-release).
Source code in src/loom/core/manifest.py
validate_app_manifest ¶
Validate a manifest dict.
Returns a list of error strings (empty = valid).
Source code in src/loom/core/manifest.py
load_manifest ¶
Load and validate a manifest from a YAML file.
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If the manifest file doesn't exist. |
ValueError
|
If the manifest is invalid. |
Source code in src/loom/core/manifest.py
Workspace¶
File-ref resolution with path traversal protection. Maps file_ref fields in
task payloads to actual file contents.
workspace ¶
Workspace manager for file-ref resolution and safe file I/O.
Many Loom pipelines pass large data between stages via file references rather than inlining content in NATS messages. This module provides a centralized utility for resolving those references safely:
- Path traversal protection (prevents ``../../../etc/passwd`` escapes)
- File existence validation
- JSON read/write helpers with structured error handling
Usage from a ProcessingBackend::
from loom.core.workspace import WorkspaceManager
ws = WorkspaceManager("/tmp/my-workspace")
path = ws.resolve("report.pdf") # validated absolute path
data = ws.read_json("report_extracted.json") # parsed dict
ws.write_json("output.json", {"key": "value"})
Usage from worker config YAML (for LLMWorker file-ref resolution)::
workspace_dir: "/tmp/my-workspace"
resolve_file_refs: ["file_ref"] # payload fields to resolve
See Also
loom.worker.runner.LLMWorker — resolves file_refs before building prompt loom.worker.processor.SyncProcessingBackend — base class for sync backends
WorkspaceManager ¶
Manages file operations within a bounded workspace directory.
All file access is restricted to the workspace boundary. Any attempt
to reference a file outside the workspace (via ../ or symlinks)
raises ValueError.
Attributes:
| Name | Type | Description |
|---|---|---|
workspace_dir |
Absolute path to the workspace root directory. |
Source code in src/loom/core/workspace.py
resolve ¶
Resolve a file reference to a validated absolute path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_ref
|
str
|
Relative filename within the workspace
(e.g., |
required |
Returns:
| Type | Description |
|---|---|
Path
|
Absolute resolved path guaranteed to be within the workspace. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the resolved path escapes the workspace boundary (path traversal attack). |
FileNotFoundError
|
If the resolved file does not exist. |
Source code in src/loom/core/workspace.py
read_json ¶
Read and parse a JSON file from the workspace.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_ref
|
str
|
Relative filename of a JSON file in the workspace. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Parsed JSON content as a dict. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If path traversal is detected. |
FileNotFoundError
|
If the file does not exist. |
JSONDecodeError
|
If the file is not valid JSON. |
Source code in src/loom/core/workspace.py
read_text ¶
Read text content from a workspace file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_ref
|
str
|
Relative filename in the workspace. |
required |
Returns:
| Type | Description |
|---|---|
str
|
File contents as a string. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If path traversal is detected. |
FileNotFoundError
|
If the file does not exist. |
Source code in src/loom/core/workspace.py
write_json ¶
Write a dict as JSON to the workspace.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filename
|
str
|
Target filename within the workspace. |
required |
data
|
dict[str, Any]
|
Dict to serialize as JSON. |
required |
Returns:
| Type | Description |
|---|---|
Path
|
Absolute path to the written file. |
Raises:
| Type | Description |
|---|---|
OSError
|
If the write fails (disk full, permissions, etc.). |