Message Bus¶
Base¶
base ¶
Abstract message bus interface for Loom actor communication.
All inter-actor communication flows through a MessageBus implementation. The default implementation is NATSBus (nats_adapter.py), but the abstraction allows alternative transports for testing (InMemoryBus) or portability.
Subscriptions are iterator-based: subscribe() returns a Subscription that
yields parsed message dicts via async for data in subscription:.
Subscription ¶
Bases: ABC
An active subscription to a message bus subject.
Yields parsed message dicts (not raw bytes) when iterated. Must be unsubscribed when no longer needed.
Usage::
sub = await bus.subscribe("loom.tasks.incoming")
async for data in sub:
await handle(data)
await sub.unsubscribe()
unsubscribe
abstractmethod
async
¶
MessageBus ¶
Bases: ABC
Abstract message bus for inter-actor communication.
Implementations must provide: - Publish/subscribe with subject-based routing - JSON serialization/deserialization (callers pass dicts, not bytes) - Optional queue groups for competing-consumer load balancing
The bus does NOT guarantee delivery. If no subscriber is listening, published messages may be silently dropped (fire-and-forget semantics). Implementations that need persistence should layer it on top.
connect
abstractmethod
async
¶
close
abstractmethod
async
¶
publish
abstractmethod
async
¶
subscribe
abstractmethod
async
¶
Subscribe to a subject, returning an async-iterable Subscription.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
subject
|
str
|
The subject to subscribe to. |
required |
queue_group
|
str | None
|
Optional queue group name. When multiple subscribers share a queue group, each message is delivered to exactly one member (competing consumers for load balancing). |
None
|
Returns:
| Type | Description |
|---|---|
Subscription
|
A Subscription that yields parsed message dicts. |
Source code in src/loom/bus/base.py
NATS Adapter¶
nats_adapter ¶
NATS message bus adapter — the default transport layer for Loom communication.
All inter-actor communication flows through this adapter. Actors never touch NATS directly; they use the MessageBus interface (or BaseActor's publish/subscribe wrappers, which delegate here).
Subject naming convention
loom.tasks.incoming — Router's inbox (all task dispatch goes here first) loom.tasks.{worker_type}.{tier} — Worker queues (router publishes here) loom.results.{goal_id} — Results routed back to orchestrators loom.results.default — Results with no parent_task_id loom.goals.incoming — Pipeline orchestrator's inbox loom.control.{actor_id} — Control messages (shutdown, status) [not yet used] loom.events — System-wide events (logging, metrics) [not yet used]
Connection defaults
reconnect_time_wait=1s, max_reconnect_attempts=60 — totals ~60s of retry. If NATS is down longer than that, the actor will crash and needs restart. Disconnect and reconnect events are logged for operational visibility.
Delivery semantics
At-most-once. If no subscriber is listening when a message is published, the message is silently dropped. NATS JetStream would add persistence but is not yet configured.
All messages are JSON-serialized dicts. Binary payloads are not supported.
Large data should be passed via file references (workspace directory), not inline in messages.
NATSSubscription ¶
Bases: Subscription
Wraps a nats-py subscription as an async iterator of parsed dicts.
Source code in src/loom/bus/nats_adapter.py
unsubscribe
async
¶
__anext__
async
¶
Yield the next message, JSON-decoded.
Blocks until a message arrives. Raises StopAsyncIteration when the underlying NATS subscription is drained or closed.
Malformed (non-JSON) messages are logged and skipped — the subscription continues processing subsequent messages rather than terminating.
Source code in src/loom/bus/nats_adapter.py
NATSBus ¶
Bases: MessageBus
NATS-backed MessageBus implementation.
Provides three messaging patterns: - publish(): Fire-and-forget (tasks, results) - subscribe(): Async iterator with optional queue groups for load balancing - request(): Request-reply for synchronous-style calls (not yet used by any actor)
Delivery semantics: at-most-once. Messages published with no active subscriber are silently dropped.
Source code in src/loom/bus/nats_adapter.py
connect
async
¶
Connect to the NATS server with reconnection and event logging.
Reconnection: 1s interval, up to 60 attempts (~60s of total retry). Disconnect/reconnect events are logged for operational visibility.
Source code in src/loom/bus/nats_adapter.py
close
async
¶
publish
async
¶
Publish a JSON-serialized dict to a NATS subject.
NOTE: No delivery guarantee — if no subscriber is listening, the message is silently dropped. NATS JetStream would add persistence but is not yet configured.
Source code in src/loom/bus/nats_adapter.py
subscribe
async
¶
Subscribe to a subject, returning an async-iterable NATSSubscription.
Queue group enables competing consumers for horizontal scaling.
Source code in src/loom/bus/nats_adapter.py
request
async
¶
Request-reply pattern for synchronous-style calls.
NOTE: Not currently used by any Loom actor. Available for future use cases like health checks or synchronous worker queries. Raises nats.errors.TimeoutError if no reply within timeout.
Source code in src/loom/bus/nats_adapter.py
Memory Bus¶
memory ¶
In-memory message bus for testing and local development.
Provides the same MessageBus interface as NATSBus but routes messages through in-process async queues. No external infrastructure required.
Limitations compared to NATSBus: - Single process only (no inter-process communication) - No persistence (messages lost on close) - Simplified queue group semantics (round-robin among group members) - No wildcard subject matching
This is intentionally simple. Use NATSBus for production.
InMemorySubscription ¶
Bases: Subscription
Subscription backed by an asyncio.Queue.
Source code in src/loom/bus/memory.py
unsubscribe
async
¶
InMemoryBus ¶
Bases: MessageBus
In-memory message bus for testing.
Messages published to a subject are delivered to all active subscribers on that subject. Queue groups are supported: within a group, messages are delivered to one member via round-robin.
Source code in src/loom/bus/memory.py
connect
async
¶
close
async
¶
Close the bus and unsubscribe all active subscriptions.
publish
async
¶
Publish a message to all subscribers on the given subject.
Source code in src/loom/bus/memory.py
subscribe
async
¶
Subscribe to a subject, optionally with a queue group.