Skip to content

Message Bus

Base

base

Abstract message bus interface for Loom actor communication.

All inter-actor communication flows through a MessageBus implementation. The default implementation is NATSBus (nats_adapter.py), but the abstraction allows alternative transports for testing (InMemoryBus) or portability.

Subscriptions are iterator-based: subscribe() returns a Subscription that yields parsed message dicts via async for data in subscription:.

Subscription

Bases: ABC

An active subscription to a message bus subject.

Yields parsed message dicts (not raw bytes) when iterated. Must be unsubscribed when no longer needed.

Usage::

sub = await bus.subscribe("loom.tasks.incoming")
async for data in sub:
    await handle(data)
await sub.unsubscribe()

unsubscribe abstractmethod async

unsubscribe() -> None

Stop receiving messages and release resources.

Source code in src/loom/bus/base.py
@abstractmethod
async def unsubscribe(self) -> None:
    """Stop receiving messages and release resources."""
    ...

__anext__ abstractmethod async

__anext__() -> dict[str, Any]

Yield the next message as a parsed dict.

Source code in src/loom/bus/base.py
@abstractmethod
async def __anext__(self) -> dict[str, Any]:
    """Yield the next message as a parsed dict."""
    ...

MessageBus

Bases: ABC

Abstract message bus for inter-actor communication.

Implementations must provide: - Publish/subscribe with subject-based routing - JSON serialization/deserialization (callers pass dicts, not bytes) - Optional queue groups for competing-consumer load balancing

The bus does NOT guarantee delivery. If no subscriber is listening, published messages may be silently dropped (fire-and-forget semantics). Implementations that need persistence should layer it on top.

connect abstractmethod async

connect() -> None

Establish connection to the message transport.

Source code in src/loom/bus/base.py
@abstractmethod
async def connect(self) -> None:
    """Establish connection to the message transport."""
    ...

close abstractmethod async

close() -> None

Gracefully disconnect, draining pending publishes.

Source code in src/loom/bus/base.py
@abstractmethod
async def close(self) -> None:
    """Gracefully disconnect, draining pending publishes."""
    ...

publish abstractmethod async

publish(subject: str, data: dict[str, Any]) -> None

Publish a message dict to a subject (fire-and-forget).

Source code in src/loom/bus/base.py
@abstractmethod
async def publish(self, subject: str, data: dict[str, Any]) -> None:
    """Publish a message dict to a subject (fire-and-forget)."""
    ...

subscribe abstractmethod async

subscribe(subject: str, queue_group: str | None = None) -> Subscription

Subscribe to a subject, returning an async-iterable Subscription.

Parameters:

Name Type Description Default
subject str

The subject to subscribe to.

required
queue_group str | None

Optional queue group name. When multiple subscribers share a queue group, each message is delivered to exactly one member (competing consumers for load balancing).

None

Returns:

Type Description
Subscription

A Subscription that yields parsed message dicts.

Source code in src/loom/bus/base.py
@abstractmethod
async def subscribe(
    self,
    subject: str,
    queue_group: str | None = None,
) -> Subscription:
    """Subscribe to a subject, returning an async-iterable Subscription.

    Args:
        subject: The subject to subscribe to.
        queue_group: Optional queue group name. When multiple subscribers
            share a queue group, each message is delivered to exactly one
            member (competing consumers for load balancing).

    Returns:
        A Subscription that yields parsed message dicts.
    """
    ...

NATS Adapter

nats_adapter

NATS message bus adapter — the default transport layer for Loom communication.

All inter-actor communication flows through this adapter. Actors never touch NATS directly; they use the MessageBus interface (or BaseActor's publish/subscribe wrappers, which delegate here).

Subject naming convention

loom.tasks.incoming — Router's inbox (all task dispatch goes here first) loom.tasks.{worker_type}.{tier} — Worker queues (router publishes here) loom.results.{goal_id} — Results routed back to orchestrators loom.results.default — Results with no parent_task_id loom.goals.incoming — Pipeline orchestrator's inbox loom.control.{actor_id} — Control messages (shutdown, status) [not yet used] loom.events — System-wide events (logging, metrics) [not yet used]

Connection defaults

reconnect_time_wait=1s, max_reconnect_attempts=60 — totals ~60s of retry. If NATS is down longer than that, the actor will crash and needs restart. Disconnect and reconnect events are logged for operational visibility.

Delivery semantics

At-most-once. If no subscriber is listening when a message is published, the message is silently dropped. NATS JetStream would add persistence but is not yet configured.

All messages are JSON-serialized dicts. Binary payloads are not supported.

Large data should be passed via file references (workspace directory), not inline in messages.

NATSSubscription

NATSSubscription(nats_sub: Any)

Bases: Subscription

Wraps a nats-py subscription as an async iterator of parsed dicts.

Source code in src/loom/bus/nats_adapter.py
def __init__(self, nats_sub: Any) -> None:
    self._sub = nats_sub

unsubscribe async

unsubscribe() -> None

Unsubscribe from the underlying NATS subscription.

Source code in src/loom/bus/nats_adapter.py
async def unsubscribe(self) -> None:
    """Unsubscribe from the underlying NATS subscription."""
    await self._sub.unsubscribe()

__anext__ async

__anext__() -> dict[str, Any]

Yield the next message, JSON-decoded.

Blocks until a message arrives. Raises StopAsyncIteration when the underlying NATS subscription is drained or closed.

Malformed (non-JSON) messages are logged and skipped — the subscription continues processing subsequent messages rather than terminating.

Source code in src/loom/bus/nats_adapter.py
async def __anext__(self) -> dict[str, Any]:
    """Yield the next message, JSON-decoded.

    Blocks until a message arrives. Raises StopAsyncIteration when the
    underlying NATS subscription is drained or closed.

    Malformed (non-JSON) messages are logged and skipped — the
    subscription continues processing subsequent messages rather than
    terminating.
    """
    while True:
        try:
            msg = await self._sub.next_msg(timeout=None)
        except Exception as e:
            logger.error(
                "nats.subscription_error",
                error=str(e),
                error_type=type(e).__name__,
            )
            raise StopAsyncIteration from e

        try:
            return json.loads(msg.data.decode())
        except (json.JSONDecodeError, UnicodeDecodeError) as e:
            logger.warning(
                "nats.malformed_message_skipped",
                error=str(e),
                error_type=type(e).__name__,
                subject=msg.subject,
                data_length=len(msg.data),
            )
            # Skip this message and wait for the next one.
            continue

NATSBus

NATSBus(url: str = 'nats://nats:4222')

Bases: MessageBus

NATS-backed MessageBus implementation.

Provides three messaging patterns: - publish(): Fire-and-forget (tasks, results) - subscribe(): Async iterator with optional queue groups for load balancing - request(): Request-reply for synchronous-style calls (not yet used by any actor)

Delivery semantics: at-most-once. Messages published with no active subscriber are silently dropped.

Source code in src/loom/bus/nats_adapter.py
def __init__(self, url: str = "nats://nats:4222") -> None:
    self.url = url
    self._nc: NATSClient | None = None

connect async

connect() -> None

Connect to the NATS server with reconnection and event logging.

Reconnection: 1s interval, up to 60 attempts (~60s of total retry). Disconnect/reconnect events are logged for operational visibility.

Source code in src/loom/bus/nats_adapter.py
async def connect(self) -> None:
    """Connect to the NATS server with reconnection and event logging.

    Reconnection: 1s interval, up to 60 attempts (~60s of total retry).
    Disconnect/reconnect events are logged for operational visibility.
    """

    async def _on_reconnect(_nc: Any) -> None:
        logger.info("bus.reconnected", url=self.url)

    async def _on_disconnect(_nc: Any) -> None:
        logger.warning("bus.disconnected", url=self.url)

    self._nc = await nats.connect(
        self.url,
        reconnect_time_wait=_RECONNECT_INITIAL_WAIT,
        max_reconnect_attempts=_RECONNECT_MAX_ATTEMPTS,
        reconnected_cb=_on_reconnect,
        disconnected_cb=_on_disconnect,
    )
    logger.info("bus.connected", url=self.url)

close async

close() -> None

Drain and close the NATS connection.

Source code in src/loom/bus/nats_adapter.py
async def close(self) -> None:
    """Drain and close the NATS connection."""
    if self._nc:
        await self._nc.drain()

publish async

publish(subject: str, data: dict[str, Any]) -> None

Publish a JSON-serialized dict to a NATS subject.

NOTE: No delivery guarantee — if no subscriber is listening, the message is silently dropped. NATS JetStream would add persistence but is not yet configured.

Source code in src/loom/bus/nats_adapter.py
async def publish(self, subject: str, data: dict[str, Any]) -> None:
    """Publish a JSON-serialized dict to a NATS subject.

    NOTE: No delivery guarantee — if no subscriber is listening,
    the message is silently dropped. NATS JetStream would add
    persistence but is not yet configured.
    """
    await self._nc.publish(subject, json.dumps(data).encode())

subscribe async

subscribe(subject: str, queue_group: str | None = None) -> NATSSubscription

Subscribe to a subject, returning an async-iterable NATSSubscription.

Queue group enables competing consumers for horizontal scaling.

Source code in src/loom/bus/nats_adapter.py
async def subscribe(
    self,
    subject: str,
    queue_group: str | None = None,
) -> NATSSubscription:
    """Subscribe to a subject, returning an async-iterable NATSSubscription.

    Queue group enables competing consumers for horizontal scaling.
    """
    if queue_group:
        nats_sub = await self._nc.subscribe(subject, queue=queue_group)
    else:
        nats_sub = await self._nc.subscribe(subject)
    return NATSSubscription(nats_sub)

request async

request(subject: str, data: dict[str, Any], timeout: float = 30.0) -> dict

Request-reply pattern for synchronous-style calls.

NOTE: Not currently used by any Loom actor. Available for future use cases like health checks or synchronous worker queries. Raises nats.errors.TimeoutError if no reply within timeout.

Source code in src/loom/bus/nats_adapter.py
async def request(self, subject: str, data: dict[str, Any], timeout: float = 30.0) -> dict:
    """Request-reply pattern for synchronous-style calls.

    NOTE: Not currently used by any Loom actor. Available for future
    use cases like health checks or synchronous worker queries.
    Raises nats.errors.TimeoutError if no reply within timeout.
    """
    resp = await self._nc.request(
        subject,
        json.dumps(data).encode(),
        timeout=timeout,
    )
    return json.loads(resp.data.decode())

Memory Bus

memory

In-memory message bus for testing and local development.

Provides the same MessageBus interface as NATSBus but routes messages through in-process async queues. No external infrastructure required.

Limitations compared to NATSBus: - Single process only (no inter-process communication) - No persistence (messages lost on close) - Simplified queue group semantics (round-robin among group members) - No wildcard subject matching

This is intentionally simple. Use NATSBus for production.

InMemorySubscription

InMemorySubscription(subject: str)

Bases: Subscription

Subscription backed by an asyncio.Queue.

Source code in src/loom/bus/memory.py
def __init__(self, subject: str) -> None:
    self.subject = subject
    self._queue: asyncio.Queue[dict[str, Any] | None] = asyncio.Queue()
    self._active = True

unsubscribe async

unsubscribe() -> None

Unsubscribe and unblock any waiting consumer.

Source code in src/loom/bus/memory.py
async def unsubscribe(self) -> None:
    """Unsubscribe and unblock any waiting consumer."""
    self._active = False
    # Push a sentinel so any waiting __anext__ unblocks.
    await self._queue.put(None)

InMemoryBus

InMemoryBus()

Bases: MessageBus

In-memory message bus for testing.

Messages published to a subject are delivered to all active subscribers on that subject. Queue groups are supported: within a group, messages are delivered to one member via round-robin.

Source code in src/loom/bus/memory.py
def __init__(self) -> None:
    # subject -> list of (queue_group | None, subscription)
    self._subscribers: dict[str, list[tuple[str | None, InMemorySubscription]]] = defaultdict(
        list
    )
    # subject -> queue_group -> round-robin counter
    self._group_counters: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int))
    self._connected = False

connect async

connect() -> None

Mark the bus as connected.

Source code in src/loom/bus/memory.py
async def connect(self) -> None:
    """Mark the bus as connected."""
    self._connected = True

close async

close() -> None

Close the bus and unsubscribe all active subscriptions.

Source code in src/loom/bus/memory.py
async def close(self) -> None:
    """Close the bus and unsubscribe all active subscriptions."""
    self._connected = False
    for subs in self._subscribers.values():
        for _, sub in subs:
            await sub.unsubscribe()
    self._subscribers.clear()

publish async

publish(subject: str, data: dict[str, Any]) -> None

Publish a message to all subscribers on the given subject.

Source code in src/loom/bus/memory.py
async def publish(self, subject: str, data: dict[str, Any]) -> None:
    """Publish a message to all subscribers on the given subject."""
    subs = self._subscribers.get(subject, [])
    if not subs:
        return

    # Partition into ungrouped and grouped subscribers.
    ungrouped = [(g, s) for g, s in subs if g is None and s._active]
    grouped: dict[str, list[InMemorySubscription]] = defaultdict(list)
    for group, sub in subs:
        if group is not None and sub._active:
            grouped[group].append(sub)

    # Deliver to all ungrouped subscribers.
    for _, sub in ungrouped:
        await sub._deliver(data)

    # Deliver to one member per queue group (round-robin).
    for group, members in grouped.items():
        if not members:
            continue
        idx = self._group_counters[subject][group] % len(members)
        await members[idx]._deliver(data)
        self._group_counters[subject][group] += 1

subscribe async

subscribe(subject: str, queue_group: str | None = None) -> InMemorySubscription

Subscribe to a subject, optionally with a queue group.

Source code in src/loom/bus/memory.py
async def subscribe(
    self,
    subject: str,
    queue_group: str | None = None,
) -> InMemorySubscription:
    """Subscribe to a subject, optionally with a queue group."""
    sub = InMemorySubscription(subject)
    self._subscribers[subject].append((queue_group, sub))
    return sub