Skip to content

Router

Task Router

router

Deterministic task router. NOT an LLM -- pure logic.

Reads router_rules.yaml and routes tasks to appropriate NATS subjects based on worker_type and model_tier.

The router is the single entry point for all task dispatch

Producers (orchestrators, CLI) publish to: loom.tasks.incoming Router resolves the tier and re-publishes to: loom.tasks.{worker_type}.{tier}

This indirection means producers never need to know which tier or backend handles a given worker_type -- that's all controlled via router_rules.yaml.

Routing pipeline (in order): 1. Deserialize incoming dict into a TaskMessage (Pydantic validation). 2. Resolve the model tier: a. If router_rules.yaml has a tier_override for this worker_type, use it. b. Otherwise, use the tier from the TaskMessage itself. 3. Check the per-tier token-bucket rate limiter: a. If the tier has capacity, allow the task through. b. If the tier is exhausted, publish to the dead-letter subject. 4. Validate the resolved tier is a known ModelTier enum value. 5. Publish to loom.tasks.{worker_type}.{tier}.

Dead-letter handling

Tasks that cannot be routed are published to loom.tasks.dead_letter with an attached reason. This covers: - Malformed messages that fail Pydantic validation - Unknown tier values after override resolution - Rate-limited tasks that exceed the tier's max_concurrent budget

Rate limiting

Per-tier token-bucket rate limiter based on max_concurrent from router_rules.yaml. Each tier gets a bucket with capacity equal to max_concurrent. A token is consumed when a task is dispatched and refilled when the refill interval elapses (tokens_per_minute / max_concurrent gives the refill rate). This is a simple dispatch-side throttle -- it does not track actual worker completion.

TokenBucketRateLimiter

TokenBucketRateLimiter(rate_limits: dict[str, dict[str, Any]])

Per-tier token-bucket rate limiter.

Each tier (local, standard, frontier) gets its own bucket. The bucket capacity equals max_concurrent from router_rules.yaml. Tokens refill at a steady rate derived from tokens_per_minute:

refill_interval = 60.0 / max_concurrent  (seconds between refills)

This means a tier with max_concurrent=4 refills one token every 15 seconds, allowing bursts up to 4 but averaging 4 dispatches per minute.

Note: This is a dispatch-side throttle only. It does not track whether workers have finished processing. For true backpressure, workers would need to report completion events that refill tokens.

Source code in src/loom/router/router.py
def __init__(self, rate_limits: dict[str, dict[str, Any]]) -> None:
    # _buckets maps tier name -> {tokens, capacity, refill_interval, last_refill}
    self._buckets: dict[str, dict[str, Any]] = {}

    for tier_name, limits in rate_limits.items():
        max_concurrent = limits.get("max_concurrent", 10)
        # refill_interval: seconds between adding one token back.
        # Derived so that over 60 seconds, max_concurrent tokens are refilled.
        refill_interval = 60.0 / max_concurrent if max_concurrent > 0 else 1.0

        self._buckets[tier_name] = {
            "tokens": float(max_concurrent),
            "capacity": float(max_concurrent),
            "refill_interval": refill_interval,
            "last_refill": time.monotonic(),
        }

    logger.info(
        "rate_limiter.initialized",
        tiers={name: int(b["capacity"]) for name, b in self._buckets.items()},
    )

try_acquire

try_acquire(tier: str) -> bool

Attempt to consume one token for the given tier.

Returns True if a token was available (task may proceed). Returns False if the bucket is empty (task should be dead-lettered).

If the tier has no configured rate limit, the task is always allowed.

Source code in src/loom/router/router.py
def try_acquire(self, tier: str) -> bool:
    """Attempt to consume one token for the given tier.

    Returns True if a token was available (task may proceed).
    Returns False if the bucket is empty (task should be dead-lettered).

    If the tier has no configured rate limit, the task is always allowed.
    """
    bucket = self._buckets.get(tier)
    if bucket is None:
        # No rate limit configured for this tier -- allow unconditionally.
        return True

    self._refill(bucket)

    if bucket["tokens"] >= 1.0:
        bucket["tokens"] -= 1.0
        return True
    return False

TaskRouter

TaskRouter(config_path: str, bus: MessageBus)

Deterministic router that dispatches TaskMessages to worker queues.

This is NOT an LLM component. It contains zero inference logic. It reads routing rules from a YAML config and applies them mechanically.

.. note::

Strategy D -- Worker-side batching. A batching layer between the router and workers could accumulate similar tasks (same worker_type + tier) and dispatch them as a single batch to reduce LLM API call overhead. This would sit here in the routing pipeline, before the publish step.

Routing pipeline::

incoming task --> resolve tier --> check rate limit --> publish to worker queue
                              |                   |
                              |                   +--> dead-letter (rate limited)
                              +--> dead-letter (bad tier / validation failure)

Subscribes to: loom.tasks.incoming

loom.tasks.{worker_type}.{tier} (normal route)

loom.tasks.dead_letter (unroutable / rate-limited)

The router runs as a long-lived async process. After subscribing, it processes tasks via NATS async callbacks. The caller (cli/main.py) is responsible for keeping the event loop alive after run() returns.

Source code in src/loom/router/router.py
def __init__(self, config_path: str, bus: MessageBus) -> None:
    self.bus = bus
    self.rules = self._load_rules(config_path)

    # Build the rate limiter from config. If no rate_limits key exists,
    # the limiter is created with no buckets (everything passes).
    rate_limits = self.rules.get("rate_limits", {})
    self._rate_limiter = TokenBucketRateLimiter(rate_limits)

    # Cache the set of valid tier values for fast validation.
    self._valid_tiers = {t.value for t in ModelTier}

    logger.info(
        "router.initialized",
        tier_overrides=self.rules.get("tier_overrides", {}),
        rate_limit_tiers=list(rate_limits.keys()),
    )

resolve_tier

resolve_tier(task: TaskMessage) -> ModelTier

Determine the model tier for a task.

Resolution priority
  1. Worker-specific override in router_rules.yaml (tier_overrides section). This lets operators force all tasks of a given worker_type to a specific tier without changing producer code.
  2. The tier specified in the TaskMessage itself (set by the producer).
  3. TaskMessage defaults to ModelTier.STANDARD if not specified.

Raises ValueError if the resolved tier string is not a valid ModelTier.

Source code in src/loom/router/router.py
def resolve_tier(self, task: TaskMessage) -> ModelTier:
    """Determine the model tier for a task.

    Resolution priority:
        1. Worker-specific override in router_rules.yaml (tier_overrides section).
           This lets operators force all tasks of a given worker_type to a
           specific tier without changing producer code.
        2. The tier specified in the TaskMessage itself (set by the producer).
        3. TaskMessage defaults to ModelTier.STANDARD if not specified.

    Raises ValueError if the resolved tier string is not a valid ModelTier.
    """
    overrides = self.rules.get("tier_overrides", {})
    if task.worker_type in overrides:
        tier_str = overrides[task.worker_type]
        # Validate that the override is a known tier. This catches typos
        # in router_rules.yaml early rather than silently publishing to
        # a subject no worker will ever subscribe to.
        return ModelTier(tier_str)
    return task.model_tier

route async

route(data: dict[str, Any]) -> None

Route a single task from loom.tasks.incoming to the correct worker queue.

This is the main routing pipeline, called once per incoming message:

1. Parse the raw dict into a TaskMessage (validates schema).
2. Resolve the tier via tier_overrides or TaskMessage default.
3. Check the per-tier rate limiter.
4. Publish to loom.tasks.{worker_type}.{tier}.

Any failure at steps 1-3 sends the task to the dead-letter subject instead of silently dropping it.

Source code in src/loom/router/router.py
async def route(self, data: dict[str, Any]) -> None:
    """Route a single task from loom.tasks.incoming to the correct worker queue.

    This is the main routing pipeline, called once per incoming message:

        1. Parse the raw dict into a TaskMessage (validates schema).
        2. Resolve the tier via tier_overrides or TaskMessage default.
        3. Check the per-tier rate limiter.
        4. Publish to loom.tasks.{worker_type}.{tier}.

    Any failure at steps 1-3 sends the task to the dead-letter subject
    instead of silently dropping it.
    """
    ctx = extract_trace_context(data)
    with _tracer.start_as_current_span("router.route", context=ctx) as span:
        # Step 1: Deserialize and validate the incoming message.
        try:
            task = TaskMessage(**data)
        except Exception as exc:
            span.record_exception(exc)
            await self._dead_letter(
                data,
                reason=f"invalid_task_message: {exc}",
                task_id=data.get("task_id"),
                worker_type=data.get("worker_type"),
            )
            return

        span.set_attribute("task.id", task.task_id)
        span.set_attribute("task.worker_type", task.worker_type)

        # Step 2: Resolve the model tier.
        try:
            tier = self.resolve_tier(task)
        except ValueError as exc:
            span.record_exception(exc)
            await self._dead_letter(
                data,
                reason=f"unknown_tier: {exc}",
                task_id=task.task_id,
                worker_type=task.worker_type,
            )
            return

        span.set_attribute("task.tier", tier.value)

        # Step 3: Check rate limit for the resolved tier.
        if not self._rate_limiter.try_acquire(tier.value):
            await self._dead_letter(
                data,
                reason=f"rate_limited: tier '{tier.value}' has no available capacity",
                task_id=task.task_id,
                worker_type=task.worker_type,
            )
            return

        # Step 4: Publish to the resolved worker subject.
        subject = f"loom.tasks.{task.worker_type}.{tier.value}"
        route_log = logger.bind(task_id=task.task_id, worker_type=task.worker_type)
        route_log.info(
            "router.task_routed",
            tier=tier.value,
            subject=subject,
        )
        outgoing = task.model_dump(mode="json")
        inject_trace_context(outgoing)
        await self.bus.publish(subject, outgoing)

run async

run() -> None

Connect to NATS and subscribe to the incoming task subject.

After this method returns, the router is actively processing tasks via NATS async callbacks. The caller is responsible for keeping the event loop alive (e.g., via await asyncio.Event().wait()).

The CLI command in cli/main.py handles this::

async def _run():
    await router.run()
    await asyncio.Event().wait()
asyncio.run(_run())
Source code in src/loom/router/router.py
async def run(self) -> None:
    """Connect to NATS and subscribe to the incoming task subject.

    After this method returns, the router is actively processing tasks
    via NATS async callbacks. The caller is responsible for keeping the
    event loop alive (e.g., via ``await asyncio.Event().wait()``).

    The CLI command in cli/main.py handles this::

        async def _run():
            await router.run()
            await asyncio.Event().wait()
        asyncio.run(_run())
    """
    await self.bus.connect()
    self._sub = await self.bus.subscribe("loom.tasks.incoming")
    logger.info("router.running", dead_letter_subject=DEAD_LETTER_SUBJECT)

process_messages async

process_messages() -> None

Process messages from the subscription until cancelled.

Call run() first to connect and subscribe, then call this to enter the message processing loop.

Source code in src/loom/router/router.py
async def process_messages(self) -> None:
    """Process messages from the subscription until cancelled.

    Call run() first to connect and subscribe, then call this to
    enter the message processing loop.
    """
    async for data in self._sub:
        await self.route(data)

Dead Letter

dead_letter

Dead-letter consumer — stores and replays unroutable/rate-limited tasks.

Subscribes to loom.tasks.dead_letter and maintains a bounded in-memory list of dead-letter entries. Each entry captures the original task data, a timestamp, and the reason for dead-lettering.

The consumer can run standalone (via CLI loom dead-letter monitor) or be embedded in the Workshop app for UI-based inspection and replay.

Replay re-publishes a dead-letter task back to loom.tasks.incoming so the router can attempt routing again.

ReplayRecord

ReplayRecord(entry_id: str, task_id: str | None = None, worker_type: str | None = None, original_reason: str = '')

Record of a replayed dead-letter entry.

Source code in src/loom/router/dead_letter.py
def __init__(
    self,
    entry_id: str,
    task_id: str | None = None,
    worker_type: str | None = None,
    original_reason: str = "",
) -> None:
    self.entry_id = entry_id
    self.task_id = task_id
    self.worker_type = worker_type
    self.original_reason = original_reason
    self.replayed_at = datetime.now(UTC).isoformat()

to_dict

to_dict() -> dict[str, Any]

Serialize to a dict for API/template consumption.

Source code in src/loom/router/dead_letter.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to a dict for API/template consumption."""
    return {
        "entry_id": self.entry_id,
        "task_id": self.task_id,
        "worker_type": self.worker_type,
        "original_reason": self.original_reason,
        "replayed_at": self.replayed_at,
    }

DeadLetterEntry

DeadLetterEntry(original_task: dict[str, Any], reason: str, task_id: str | None = None, worker_type: str | None = None)

A single dead-letter entry with metadata.

Source code in src/loom/router/dead_letter.py
def __init__(
    self,
    original_task: dict[str, Any],
    reason: str,
    task_id: str | None = None,
    worker_type: str | None = None,
) -> None:
    self.id = str(uuid.uuid4())
    self.timestamp = datetime.now(UTC).isoformat()
    self.reason = reason
    self.task_id = task_id
    self.worker_type = worker_type
    self.original_task = original_task

to_dict

to_dict() -> dict[str, Any]

Serialize to a dict for API/template consumption.

Source code in src/loom/router/dead_letter.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to a dict for API/template consumption."""
    return {
        "id": self.id,
        "timestamp": self.timestamp,
        "reason": self.reason,
        "task_id": self.task_id,
        "worker_type": self.worker_type,
        "original_task": self.original_task,
    }

DeadLetterConsumer

DeadLetterConsumer(actor_id: str = 'dead-letter-consumer', max_size: int = 1000, nats_url: str = 'nats://nats:4222', *, bus: MessageBus | None = None)

Bases: BaseActor

Actor that consumes dead-letter messages and stores them in memory.

Subscribes to loom.tasks.dead_letter and maintains a bounded list of entries (most recent first). When the list exceeds max_size, the oldest entries are evicted.

Can be used standalone (via run()) or embedded — call store() directly to add entries without a subscription (used by the Workshop).

Parameters:

Name Type Description Default
actor_id str

Unique identifier for this actor instance.

'dead-letter-consumer'
max_size int

Maximum number of entries to retain (default 1000).

1000
nats_url str

NATS server URL (used only when no bus is provided).

'nats://nats:4222'
bus MessageBus | None

Optional injected message bus (for testing or embedding).

None
Source code in src/loom/router/dead_letter.py
def __init__(
    self,
    actor_id: str = "dead-letter-consumer",
    max_size: int = 1000,
    nats_url: str = "nats://nats:4222",
    *,
    bus: MessageBus | None = None,
) -> None:
    super().__init__(actor_id=actor_id, nats_url=nats_url, bus=bus)
    self.max_size = max_size
    self._entries: list[DeadLetterEntry] = []
    self._replay_log: list[ReplayRecord] = []

handle_message async

handle_message(data: dict[str, Any]) -> None

Process a dead-letter message from the bus.

Extracts reason, task_id, and worker_type from the dead-letter envelope (as published by TaskRouter._dead_letter), stores the entry, and logs the event.

Source code in src/loom/router/dead_letter.py
async def handle_message(self, data: dict[str, Any]) -> None:
    """Process a dead-letter message from the bus.

    Extracts reason, task_id, and worker_type from the dead-letter
    envelope (as published by ``TaskRouter._dead_letter``), stores
    the entry, and logs the event.
    """
    reason = data.get("reason", "unknown")
    task_id = data.get("task_id")
    worker_type = data.get("worker_type")
    original_task = data.get("original_task", data)

    self.store(original_task, reason, task_id=task_id, worker_type=worker_type)

store

store(original_task: dict[str, Any], reason: str, *, task_id: str | None = None, worker_type: str | None = None) -> DeadLetterEntry

Store a dead-letter entry.

Inserts at the front of the list (most recent first) and evicts the oldest entry if max_size is exceeded.

Returns the created entry.

Source code in src/loom/router/dead_letter.py
def store(
    self,
    original_task: dict[str, Any],
    reason: str,
    *,
    task_id: str | None = None,
    worker_type: str | None = None,
) -> DeadLetterEntry:
    """Store a dead-letter entry.

    Inserts at the front of the list (most recent first) and evicts
    the oldest entry if ``max_size`` is exceeded.

    Returns the created entry.
    """
    entry = DeadLetterEntry(
        original_task=original_task,
        reason=reason,
        task_id=task_id,
        worker_type=worker_type,
    )

    # Insert at front (most recent first).
    self._entries.insert(0, entry)

    # Evict oldest if over capacity.
    if len(self._entries) > self.max_size:
        self._entries = self._entries[: self.max_size]

    logger.info(
        "dead_letter.received",
        task_id=task_id,
        worker_type=worker_type,
        reason=reason,
        total_entries=len(self._entries),
    )

    return entry

list_entries

list_entries(limit: int = 50, offset: int = 0) -> list[dict[str, Any]]

Return a page of dead-letter entries (most recent first).

Parameters:

Name Type Description Default
limit int

Maximum number of entries to return.

50
offset int

Number of entries to skip from the start.

0

Returns:

Type Description
list[dict[str, Any]]

List of entry dicts.

Source code in src/loom/router/dead_letter.py
def list_entries(self, limit: int = 50, offset: int = 0) -> list[dict[str, Any]]:
    """Return a page of dead-letter entries (most recent first).

    Args:
        limit: Maximum number of entries to return.
        offset: Number of entries to skip from the start.

    Returns:
        List of entry dicts.
    """
    sliced = self._entries[offset : offset + limit]
    return [e.to_dict() for e in sliced]

count

count() -> int

Return the total number of stored entries.

Source code in src/loom/router/dead_letter.py
def count(self) -> int:
    """Return the total number of stored entries."""
    return len(self._entries)

clear

clear() -> None

Remove all stored entries.

Source code in src/loom/router/dead_letter.py
def clear(self) -> None:
    """Remove all stored entries."""
    self._entries.clear()
    logger.info("dead_letter.cleared")

replay async

replay(entry_id: str, bus: MessageBus) -> bool

Re-publish a dead-letter task back to loom.tasks.incoming.

Finds the entry by ID, publishes its original_task to the incoming subject, removes it from the stored list, and records the replay in the audit log.

Parameters:

Name Type Description Default
entry_id str

The UUID of the entry to replay.

required
bus MessageBus

The message bus to publish the replayed task on.

required

Returns:

Type Description
bool

True if the entry was found and replayed, False otherwise.

Source code in src/loom/router/dead_letter.py
async def replay(self, entry_id: str, bus: MessageBus) -> bool:
    """Re-publish a dead-letter task back to ``loom.tasks.incoming``.

    Finds the entry by ID, publishes its ``original_task`` to the
    incoming subject, removes it from the stored list, and records
    the replay in the audit log.

    Args:
        entry_id: The UUID of the entry to replay.
        bus: The message bus to publish the replayed task on.

    Returns:
        True if the entry was found and replayed, False otherwise.
    """
    for i, entry in enumerate(self._entries):
        if entry.id == entry_id:
            await bus.publish(INCOMING_SUBJECT, entry.original_task)
            self._entries.pop(i)

            # Record in audit log
            record = ReplayRecord(
                entry_id=entry_id,
                task_id=entry.task_id,
                worker_type=entry.worker_type,
                original_reason=entry.reason,
            )
            self._replay_log.append(record)

            logger.info(
                "dead_letter.replayed",
                entry_id=entry_id,
                task_id=entry.task_id,
                worker_type=entry.worker_type,
            )
            return True
    return False

replay_log

replay_log(limit: int = 50) -> list[dict[str, Any]]

Return the replay audit log (most recent first).

Parameters:

Name Type Description Default
limit int

Maximum number of records to return.

50

Returns:

Type Description
list[dict[str, Any]]

List of replay record dicts.

Source code in src/loom/router/dead_letter.py
def replay_log(self, limit: int = 50) -> list[dict[str, Any]]:
    """Return the replay audit log (most recent first).

    Args:
        limit: Maximum number of records to return.

    Returns:
        List of replay record dicts.
    """
    # Most recent last in list, so reverse for display
    return [r.to_dict() for r in reversed(self._replay_log[-limit:])]

replay_count

replay_count() -> int

Return the total number of replayed entries.

Source code in src/loom/router/dead_letter.py
def replay_count(self) -> int:
    """Return the total number of replayed entries."""
    return len(self._replay_log)