Router¶
Task Router¶
router ¶
Deterministic task router. NOT an LLM -- pure logic.
Reads router_rules.yaml and routes tasks to appropriate NATS subjects based on worker_type and model_tier.
The router is the single entry point for all task dispatch
Producers (orchestrators, CLI) publish to: loom.tasks.incoming Router resolves the tier and re-publishes to: loom.tasks.{worker_type}.{tier}
This indirection means producers never need to know which tier or backend handles a given worker_type -- that's all controlled via router_rules.yaml.
Routing pipeline (in order): 1. Deserialize incoming dict into a TaskMessage (Pydantic validation). 2. Resolve the model tier: a. If router_rules.yaml has a tier_override for this worker_type, use it. b. Otherwise, use the tier from the TaskMessage itself. 3. Check the per-tier token-bucket rate limiter: a. If the tier has capacity, allow the task through. b. If the tier is exhausted, publish to the dead-letter subject. 4. Validate the resolved tier is a known ModelTier enum value. 5. Publish to loom.tasks.{worker_type}.{tier}.
Dead-letter handling
Tasks that cannot be routed are published to loom.tasks.dead_letter with an attached reason. This covers: - Malformed messages that fail Pydantic validation - Unknown tier values after override resolution - Rate-limited tasks that exceed the tier's max_concurrent budget
Rate limiting
Per-tier token-bucket rate limiter based on max_concurrent from router_rules.yaml. Each tier gets a bucket with capacity equal to max_concurrent. A token is consumed when a task is dispatched and refilled when the refill interval elapses (tokens_per_minute / max_concurrent gives the refill rate). This is a simple dispatch-side throttle -- it does not track actual worker completion.
TokenBucketRateLimiter ¶
Per-tier token-bucket rate limiter.
Each tier (local, standard, frontier) gets its own bucket. The bucket capacity equals max_concurrent from router_rules.yaml. Tokens refill at a steady rate derived from tokens_per_minute:
refill_interval = 60.0 / max_concurrent (seconds between refills)
This means a tier with max_concurrent=4 refills one token every 15 seconds, allowing bursts up to 4 but averaging 4 dispatches per minute.
Note: This is a dispatch-side throttle only. It does not track whether workers have finished processing. For true backpressure, workers would need to report completion events that refill tokens.
Source code in src/loom/router/router.py
try_acquire ¶
Attempt to consume one token for the given tier.
Returns True if a token was available (task may proceed). Returns False if the bucket is empty (task should be dead-lettered).
If the tier has no configured rate limit, the task is always allowed.
Source code in src/loom/router/router.py
TaskRouter ¶
Deterministic router that dispatches TaskMessages to worker queues.
This is NOT an LLM component. It contains zero inference logic. It reads routing rules from a YAML config and applies them mechanically.
.. note::
Strategy D -- Worker-side batching. A batching layer between the router and workers could accumulate similar tasks (same worker_type + tier) and dispatch them as a single batch to reduce LLM API call overhead. This would sit here in the routing pipeline, before the publish step.
Routing pipeline::
incoming task --> resolve tier --> check rate limit --> publish to worker queue
| |
| +--> dead-letter (rate limited)
+--> dead-letter (bad tier / validation failure)
Subscribes to: loom.tasks.incoming
loom.tasks.{worker_type}.{tier} (normal route)
loom.tasks.dead_letter (unroutable / rate-limited)
The router runs as a long-lived async process. After subscribing, it processes tasks via NATS async callbacks. The caller (cli/main.py) is responsible for keeping the event loop alive after run() returns.
Source code in src/loom/router/router.py
resolve_tier ¶
Determine the model tier for a task.
Resolution priority
- Worker-specific override in router_rules.yaml (tier_overrides section). This lets operators force all tasks of a given worker_type to a specific tier without changing producer code.
- The tier specified in the TaskMessage itself (set by the producer).
- TaskMessage defaults to ModelTier.STANDARD if not specified.
Raises ValueError if the resolved tier string is not a valid ModelTier.
Source code in src/loom/router/router.py
route
async
¶
Route a single task from loom.tasks.incoming to the correct worker queue.
This is the main routing pipeline, called once per incoming message:
1. Parse the raw dict into a TaskMessage (validates schema).
2. Resolve the tier via tier_overrides or TaskMessage default.
3. Check the per-tier rate limiter.
4. Publish to loom.tasks.{worker_type}.{tier}.
Any failure at steps 1-3 sends the task to the dead-letter subject instead of silently dropping it.
Source code in src/loom/router/router.py
run
async
¶
Connect to NATS and subscribe to the incoming task subject.
After this method returns, the router is actively processing tasks
via NATS async callbacks. The caller is responsible for keeping the
event loop alive (e.g., via await asyncio.Event().wait()).
The CLI command in cli/main.py handles this::
async def _run():
await router.run()
await asyncio.Event().wait()
asyncio.run(_run())
Source code in src/loom/router/router.py
process_messages
async
¶
Process messages from the subscription until cancelled.
Call run() first to connect and subscribe, then call this to enter the message processing loop.
Source code in src/loom/router/router.py
Dead Letter¶
dead_letter ¶
Dead-letter consumer — stores and replays unroutable/rate-limited tasks.
Subscribes to loom.tasks.dead_letter and maintains a bounded in-memory
list of dead-letter entries. Each entry captures the original task data,
a timestamp, and the reason for dead-lettering.
The consumer can run standalone (via CLI loom dead-letter monitor) or
be embedded in the Workshop app for UI-based inspection and replay.
Replay re-publishes a dead-letter task back to loom.tasks.incoming
so the router can attempt routing again.
ReplayRecord ¶
ReplayRecord(entry_id: str, task_id: str | None = None, worker_type: str | None = None, original_reason: str = '')
Record of a replayed dead-letter entry.
Source code in src/loom/router/dead_letter.py
to_dict ¶
Serialize to a dict for API/template consumption.
Source code in src/loom/router/dead_letter.py
DeadLetterEntry ¶
DeadLetterEntry(original_task: dict[str, Any], reason: str, task_id: str | None = None, worker_type: str | None = None)
A single dead-letter entry with metadata.
Source code in src/loom/router/dead_letter.py
to_dict ¶
Serialize to a dict for API/template consumption.
Source code in src/loom/router/dead_letter.py
DeadLetterConsumer ¶
DeadLetterConsumer(actor_id: str = 'dead-letter-consumer', max_size: int = 1000, nats_url: str = 'nats://nats:4222', *, bus: MessageBus | None = None)
Bases: BaseActor
Actor that consumes dead-letter messages and stores them in memory.
Subscribes to loom.tasks.dead_letter and maintains a bounded list
of entries (most recent first). When the list exceeds max_size,
the oldest entries are evicted.
Can be used standalone (via run()) or embedded — call store()
directly to add entries without a subscription (used by the Workshop).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
actor_id
|
str
|
Unique identifier for this actor instance. |
'dead-letter-consumer'
|
max_size
|
int
|
Maximum number of entries to retain (default 1000). |
1000
|
nats_url
|
str
|
NATS server URL (used only when no |
'nats://nats:4222'
|
bus
|
MessageBus | None
|
Optional injected message bus (for testing or embedding). |
None
|
Source code in src/loom/router/dead_letter.py
handle_message
async
¶
Process a dead-letter message from the bus.
Extracts reason, task_id, and worker_type from the dead-letter
envelope (as published by TaskRouter._dead_letter), stores
the entry, and logs the event.
Source code in src/loom/router/dead_letter.py
store ¶
store(original_task: dict[str, Any], reason: str, *, task_id: str | None = None, worker_type: str | None = None) -> DeadLetterEntry
Store a dead-letter entry.
Inserts at the front of the list (most recent first) and evicts
the oldest entry if max_size is exceeded.
Returns the created entry.
Source code in src/loom/router/dead_letter.py
list_entries ¶
Return a page of dead-letter entries (most recent first).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum number of entries to return. |
50
|
offset
|
int
|
Number of entries to skip from the start. |
0
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of entry dicts. |
Source code in src/loom/router/dead_letter.py
count ¶
clear ¶
replay
async
¶
Re-publish a dead-letter task back to loom.tasks.incoming.
Finds the entry by ID, publishes its original_task to the
incoming subject, removes it from the stored list, and records
the replay in the audit log.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entry_id
|
str
|
The UUID of the entry to replay. |
required |
bus
|
MessageBus
|
The message bus to publish the replayed task on. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the entry was found and replayed, False otherwise. |
Source code in src/loom/router/dead_letter.py
replay_log ¶
Return the replay audit log (most recent first).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum number of records to return. |
50
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of replay record dicts. |