Skip to content

MCP

The MCP gateway uses FastMCP 3.x to expose LOOM workers, pipelines, query backends, Workshop operations, and session management as MCP tools.

Server

server

MCP server assembly — wires config, discovery, bridge, and resources.

Creates a fully configured FastMCP server from a LOOM MCP gateway config YAML. The server exposes LOOM workers, pipelines, query backends, and Workshop operations as MCP tools, and workspace files as MCP resources.

Usage::

from loom.mcp.server import create_server, run_stdio

mcp, gateway = create_server("configs/mcp/docman.yaml")
run_stdio(mcp, gateway)
See Also

loom.mcp.config — config loading and validation loom.mcp.discovery — tool definition generation loom.mcp.bridge — NATS call dispatch loom.mcp.resources — workspace resource exposure loom.mcp.workshop_discovery — Workshop tool definitions loom.mcp.workshop_bridge — Workshop direct dispatch

ToolEntry dataclass

ToolEntry(name: str, kind: str, tool_def: dict[str, Any], loom_meta: dict[str, Any])

Registry entry linking an MCP tool name to its dispatch info.

MCPGateway dataclass

MCPGateway(config: dict[str, Any], bridge: MCPBridge, tool_registry: dict[str, ToolEntry] = dict(), tool_defs: list[dict[str, Any]] = list(), resources: WorkspaceResources | None = None, workshop_bridge: WorkshopBridge | None = None, session_bridge: SessionBridge | None = None, requires_bus: bool = True)

Holds all state for a running MCP gateway.

create_server

create_server(config_path: str) -> tuple[FastMCPType, MCPGateway]

Create a FastMCP server and MCPGateway from a config file.

Returns:

Type Description
FastMCP

Tuple of (FastMCP, MCPGateway).

MCPGateway

The gateway must be connected before the server can handle calls.

Source code in src/loom/mcp/server.py
def create_server(config_path: str) -> tuple[FastMCPType, MCPGateway]:
    """Create a FastMCP server and MCPGateway from a config file.

    Returns:
        Tuple of (FastMCP, MCPGateway).
        The gateway must be connected before the server can handle calls.
    """
    from fastmcp import FastMCP

    config = load_mcp_config(config_path)
    nats_url = config.get("nats_url", "nats://nats:4222")
    bus = NATSBus(nats_url)
    bridge = MCPBridge(bus)

    # --- Discover tools ---
    tools_config = config.get("tools", {})
    requires_bus = bool(
        tools_config.get("workers") or tools_config.get("pipelines") or tools_config.get("queries")
    )

    all_tools: list[dict[str, Any]] = []
    all_tools.extend(discover_worker_tools(tools_config.get("workers", [])))
    all_tools.extend(discover_pipeline_tools(tools_config.get("pipelines", [])))
    all_tools.extend(discover_query_tools(tools_config.get("queries", [])))

    # Workshop tools (optional — only if tools.workshop is present).
    workshop_config = tools_config.get("workshop")
    workshop_bridge: WorkshopBridge | None = None
    if workshop_config is not None:
        all_tools.extend(discover_workshop_tools(workshop_config))
        workshop_bridge = _build_workshop_bridge(
            workshop_config,
            replay_bus=bridge.bus if requires_bus else None,
        )

    # Session tools (optional — only if tools.session is present).
    session_config = tools_config.get("session")
    session_bridge: SessionBridge | None = None
    if session_config is not None:
        all_tools.extend(discover_session_tools(session_config))
        session_bridge = SessionBridge(
            framework_dir=session_config.get(
                "framework_dir",
                os.environ.get("ITP_ROOT", ".") + "/framework",
            ),
            workspace_dir=session_config.get(
                "workspace_dir",
                os.environ.get("BAFT_WORKSPACE", "./itp-workspace"),
            ),
            baft_dir=session_config.get("baft_dir"),
            nats_url=session_config.get("nats_url", nats_url),
            ollama_url=session_config.get(
                "ollama_url",
                os.environ.get("OLLAMA_URL", "http://localhost:11434"),
            ),
        )

    # Build registry.
    registry: dict[str, ToolEntry] = {}
    mcp_tool_defs: list[dict[str, Any]] = []

    for tool in all_tools:
        loom_meta = tool.pop("_loom", {})
        entry = ToolEntry(
            name=tool["name"],
            kind=loom_meta.get("kind", "unknown"),
            tool_def=tool,
            loom_meta=loom_meta,
        )
        registry[tool["name"]] = entry
        mcp_tool_defs.append(tool)

    logger.info(
        "mcp.server.tools_discovered",
        count=len(registry),
        tools=sorted(registry.keys()),
    )

    # --- Set up resources ---
    resources_config = config.get("resources")
    workspace_resources: WorkspaceResources | None = None
    if resources_config:
        workspace_resources = WorkspaceResources(
            workspace_dir=resources_config["workspace_dir"],
            patterns=resources_config.get("patterns"),
        )

    gateway = MCPGateway(
        config=config,
        bridge=bridge,
        tool_registry=registry,
        tool_defs=mcp_tool_defs,
        resources=workspace_resources,
        workshop_bridge=workshop_bridge,
        session_bridge=session_bridge,
        requires_bus=requires_bus,
    )

    # --- Build FastMCP Server ---
    mcp = FastMCP(
        name=config["name"],
        instructions=config.get("description"),
    )

    # Register each discovered tool dynamically.
    for entry in registry.values():
        _register_tool(mcp, gateway, entry)

    # Register workspace resources.
    if workspace_resources:
        _register_resources(mcp, gateway)

    # Health endpoint (available in HTTP transport).
    _register_health(mcp, gateway)

    return mcp, gateway

run_stdio

run_stdio(server: FastMCP, gateway: MCPGateway) -> None

Run the MCP server on stdio transport (blocking).

Source code in src/loom/mcp/server.py
def run_stdio(server: FastMCPType, gateway: MCPGateway) -> None:
    """Run the MCP server on stdio transport (blocking)."""

    async def _run() -> None:
        bridge_connected = False
        if gateway.requires_bus:
            await gateway.bridge.connect()
            bridge_connected = True
            logger.info(
                "mcp.gateway.connected",
                nats_url=gateway.config.get("nats_url"),
            )

        if gateway.resources:
            gateway.resources.snapshot()

        try:
            await server.run_async(transport="stdio")
        finally:
            if bridge_connected:
                await gateway.bridge.close()

    asyncio.run(_run())

run_streamable_http

run_streamable_http(server: FastMCP, gateway: MCPGateway, host: str = '127.0.0.1', port: int = 8000) -> None

Run the MCP server on streamable HTTP transport (blocking).

FastMCP handles all Starlette/uvicorn setup internally.

Source code in src/loom/mcp/server.py
def run_streamable_http(
    server: FastMCPType,
    gateway: MCPGateway,
    host: str = "127.0.0.1",
    port: int = 8000,
) -> None:
    """Run the MCP server on streamable HTTP transport (blocking).

    FastMCP handles all Starlette/uvicorn setup internally.
    """

    async def _run() -> None:
        bridge_connected = False
        if gateway.requires_bus:
            await gateway.bridge.connect()
            bridge_connected = True
            logger.info(
                "mcp.gateway.connected",
                nats_url=gateway.config.get("nats_url"),
            )

        if gateway.resources:
            gateway.resources.snapshot()

        try:
            await server.run_async(
                transport="http",
                host=host,
                port=port,
            )
        finally:
            if bridge_connected:
                await gateway.bridge.close()

    asyncio.run(_run())

Bridge

bridge

MCP-to-NATS call bridge.

Converts MCP tool invocations into LOOM messages (TaskMessage or OrchestratorGoal), publishes them to the NATS bus, waits for the corresponding TaskResult, and returns the output.

The bridge is transport-agnostic — it accepts any MessageBus implementation (NATSBus for production, InMemoryBus for testing).

Three call patterns:

  • call_worker — direct worker dispatch via loom.tasks.incoming
  • call_pipeline — pipeline goal via loom.goals.incoming
  • call_query — worker dispatch with action field in payload

BridgeError

Bases: Exception

Raised when a bridge call fails.

BridgeTimeoutError

Bases: BridgeError

Raised when a bridge call times out.

MCPBridge

MCPBridge(bus: MessageBus)

Bridges MCP tool calls to the LOOM actor mesh via NATS.

Holds a MessageBus connection and provides three dispatch methods corresponding to the three MCP tool kinds: worker, pipeline, query.

Source code in src/loom/mcp/bridge.py
def __init__(self, bus: MessageBus) -> None:
    self.bus = bus

connect async

connect() -> None

Connect the underlying message bus.

Source code in src/loom/mcp/bridge.py
async def connect(self) -> None:
    """Connect the underlying message bus."""
    await self.bus.connect()

close async

close() -> None

Close the underlying message bus.

Source code in src/loom/mcp/bridge.py
async def close(self) -> None:
    """Close the underlying message bus."""
    await self.bus.close()

call_worker async

call_worker(worker_type: str, tier: str, payload: dict[str, Any], timeout: float = 60) -> dict[str, Any]

Dispatch a task to a LOOM worker and wait for the result.

Publishes a TaskMessage to loom.tasks.incoming (where the router picks it up) and subscribes to the result subject.

Parameters:

Name Type Description Default
worker_type str

Worker name (matches worker config name).

required
tier str

Model tier (local, standard, frontier).

required
payload dict[str, Any]

Tool arguments, validated against worker input_schema.

required
timeout float

Seconds to wait for result.

60

Returns:

Type Description
dict[str, Any]

The worker's output dict.

Raises:

Type Description
BridgeError

If the worker returns a failed result.

BridgeTimeoutError

If no result arrives within timeout.

Source code in src/loom/mcp/bridge.py
async def call_worker(
    self,
    worker_type: str,
    tier: str,
    payload: dict[str, Any],
    timeout: float = 60,
) -> dict[str, Any]:
    """Dispatch a task to a LOOM worker and wait for the result.

    Publishes a ``TaskMessage`` to ``loom.tasks.incoming`` (where the
    router picks it up) and subscribes to the result subject.

    Args:
        worker_type: Worker name (matches worker config ``name``).
        tier: Model tier (local, standard, frontier).
        payload: Tool arguments, validated against worker input_schema.
        timeout: Seconds to wait for result.

    Returns:
        The worker's output dict.

    Raises:
        BridgeError: If the worker returns a failed result.
        BridgeTimeoutError: If no result arrives within timeout.
    """
    call_id = _new_id()

    task = TaskMessage(
        worker_type=worker_type,
        payload=payload,
        model_tier=ModelTier(tier),
        parent_task_id=call_id,
    )

    result = await self._dispatch_and_wait(
        publish_subject="loom.tasks.incoming",
        message=task.model_dump(mode="json"),
        result_subject=f"loom.results.{call_id}",
        match_task_id=task.task_id,
        timeout=timeout,
    )

    return self._unwrap_result(result)

call_pipeline async

call_pipeline(goal_context: dict[str, Any], timeout: float = 300, progress_callback: Callable[[str, int, int], Any] | None = None) -> dict[str, Any]

Submit an OrchestratorGoal and wait for the pipeline result.

Parameters:

Name Type Description Default
goal_context dict[str, Any]

Dict of context fields (e.g. {file_ref: "doc.pdf"}).

required
timeout float

Seconds to wait for the full pipeline.

300
progress_callback Callable[[str, int, int], Any] | None

Optional async/sync (stage_name, stage_idx, total) called as intermediate stage results arrive.

None

Returns:

Type Description
dict[str, Any]

The pipeline's final output dict (all stage outputs).

Source code in src/loom/mcp/bridge.py
async def call_pipeline(
    self,
    goal_context: dict[str, Any],
    timeout: float = 300,
    progress_callback: Callable[[str, int, int], Any] | None = None,
) -> dict[str, Any]:
    """Submit an OrchestratorGoal and wait for the pipeline result.

    Args:
        goal_context: Dict of context fields (e.g. ``{file_ref: "doc.pdf"}``).
        timeout: Seconds to wait for the full pipeline.
        progress_callback: Optional ``async/sync (stage_name, stage_idx, total)``
            called as intermediate stage results arrive.

    Returns:
        The pipeline's final output dict (all stage outputs).
    """
    goal = OrchestratorGoal(
        instruction="MCP tool call",
        context=goal_context,
    )

    result_subject = f"loom.results.{goal.goal_id}"
    sub = await self.bus.subscribe(result_subject)

    await self.bus.publish(
        "loom.goals.incoming",
        goal.model_dump(mode="json"),
    )

    logger.info("bridge.pipeline_dispatched", goal_id=goal.goal_id)

    try:
        # The pipeline publishes its final result with task_id == goal_id.
        # Intermediate stage results also arrive on this subject (from
        # individual workers) but with different task_ids.
        final_result = await self._collect_pipeline_results(
            sub,
            goal.goal_id,
            timeout,
            progress_callback,
        )
    finally:
        await sub.unsubscribe()

    return self._unwrap_result(final_result)

call_query async

call_query(worker_type: str, action: str, payload: dict[str, Any], timeout: float = 30) -> dict[str, Any]

Dispatch a query action to a LOOM query worker.

Wraps the payload with an action field and dispatches as a regular worker task.

Source code in src/loom/mcp/bridge.py
async def call_query(
    self,
    worker_type: str,
    action: str,
    payload: dict[str, Any],
    timeout: float = 30,
) -> dict[str, Any]:
    """Dispatch a query action to a LOOM query worker.

    Wraps the payload with an ``action`` field and dispatches as a
    regular worker task.
    """
    full_payload = {"action": action, **payload}
    return await self.call_worker(
        worker_type=worker_type,
        tier="local",  # Query backends are always local (non-LLM).
        payload=full_payload,
        timeout=timeout,
    )

Config

config

MCP gateway configuration loading and validation.

Defines the YAML config structure for exposing LOOM workers, pipelines, query backends, and Workshop operations as MCP tools. Follows the same validation pattern as loom.core.config.

Example config::

name: "docman"
description: "Document processing and querying"
nats_url: "nats://localhost:4222"

tools:
  workers:
    - config: "configs/workers/doc_classifier.yaml"
      name: "classify_document"
      description: "Classify a document by type"
      tier: "local"

  pipelines:
    - config: "configs/orchestrators/doc_pipeline.yaml"
      name: "process_document"
      description: "Full document processing pipeline"

  queries:
    - backend: "docman.backends.duckdb_query.DocmanQueryBackend"
      backend_config:
        db_path: "/tmp/workspace/docman.duckdb"
      actions: ["search", "filter", "stats", "get"]
      name_prefix: "docman"

resources:
  workspace_dir: "/tmp/workspace"
  patterns: ["*.pdf", "*.json"]

load_mcp_config

load_mcp_config(path: str | Path) -> dict[str, Any]

Load and validate an MCP gateway config YAML.

Raises:

Type Description
ConfigValidationError

If the config has structural errors.

FileNotFoundError

If the config file doesn't exist.

YAMLError

If the file contains invalid YAML.

Source code in src/loom/mcp/config.py
def load_mcp_config(path: str | Path) -> dict[str, Any]:
    """Load and validate an MCP gateway config YAML.

    Raises:
        ConfigValidationError: If the config has structural errors.
        FileNotFoundError: If the config file doesn't exist.
        yaml.YAMLError: If the file contains invalid YAML.
    """
    from loom.core.config import ConfigValidationError

    config = load_config(path)
    errors = validate_mcp_config(config, path)
    if errors:
        raise ConfigValidationError(
            f"MCP config at {path} has {len(errors)} error(s):\n"
            + "\n".join(f"  - {e}" for e in errors)
        )
    return config

validate_mcp_config

validate_mcp_config(config: dict[str, Any], path: str | Path = '<unknown>') -> list[str]

Validate an MCP gateway config dict.

Returns a list of error strings (empty = valid).

Source code in src/loom/mcp/config.py
def validate_mcp_config(  # noqa: PLR0912
    config: dict[str, Any],
    path: str | Path = "<unknown>",
) -> list[str]:
    """Validate an MCP gateway config dict.

    Returns a list of error strings (empty = valid).
    """
    errors: list[str] = []

    if not isinstance(config, dict):
        return [f"MCP config at {path}: expected dict, got {type(config).__name__}"]

    # Required top-level keys.
    if "name" not in config:
        errors.append("missing required key 'name'")
    elif not isinstance(config["name"], str):
        errors.append("'name' must be a string")

    # Optional but typed.
    if "description" in config and not isinstance(config["description"], str):
        errors.append("'description' must be a string")
    if "nats_url" in config and not isinstance(config["nats_url"], str):
        errors.append("'nats_url' must be a string")

    # Tools section.
    tools = config.get("tools", {})
    if not isinstance(tools, dict):
        errors.append("'tools' must be a dict")
    else:
        errors.extend(_validate_worker_entries(tools.get("workers", [])))
        errors.extend(_validate_pipeline_entries(tools.get("pipelines", [])))
        errors.extend(_validate_query_entries(tools.get("queries", [])))
        errors.extend(_validate_workshop_config(tools.get("workshop")))
        errors.extend(_validate_session_config(tools.get("session")))

    # Resources section.
    resources = config.get("resources")
    if resources is not None:
        if not isinstance(resources, dict):
            errors.append("'resources' must be a dict")
        else:
            if "workspace_dir" not in resources:
                errors.append("resources: missing required key 'workspace_dir'")
            elif not isinstance(resources["workspace_dir"], str):
                errors.append("resources: 'workspace_dir' must be a string")
            patterns = resources.get("patterns")
            if patterns is not None and not isinstance(patterns, list):
                errors.append("resources: 'patterns' must be a list")

    return errors

Discovery

discovery

Tool discovery — introspect LOOM configs and generate MCP tool definitions.

Reads worker YAML configs, pipeline configs, and query backend classes to produce MCP Tool objects with correct inputSchema definitions. This is the core of the zero-code MCP exposure: LOOM configs already contain names, descriptions, and JSON Schema contracts — this module reshapes them into the MCP format.

Three discovery functions correspond to the three tool sources:

  • discover_worker_tools — one tool per worker config
  • discover_pipeline_tools — one tool per pipeline config
  • discover_query_tools — one tool per query action

make_tool

make_tool(name: str, description: str, input_schema: dict[str, Any]) -> dict[str, Any]

Build an MCP-compatible tool definition dict.

Source code in src/loom/mcp/discovery.py
def make_tool(
    name: str,
    description: str,
    input_schema: dict[str, Any],
) -> dict[str, Any]:
    """Build an MCP-compatible tool definition dict."""
    return {
        "name": name,
        "description": description,
        "inputSchema": input_schema,
    }

discover_worker_tools

discover_worker_tools(worker_entries: list[dict[str, Any]]) -> list[dict[str, Any]]

Generate MCP tool definitions from worker config entries.

Each entry in worker_entries is a dict from the MCP gateway config::

{ "config": "path/to/worker.yaml", "name": "override", ... }

The worker YAML provides name, input_schema, and system_prompt. MCP config entries can override name and description.

Returns:

Type Description
list[dict[str, Any]]

List of tool definition dicts (one per worker entry).

Source code in src/loom/mcp/discovery.py
def discover_worker_tools(
    worker_entries: list[dict[str, Any]],
) -> list[dict[str, Any]]:
    """Generate MCP tool definitions from worker config entries.

    Each entry in *worker_entries* is a dict from the MCP gateway config::

        { "config": "path/to/worker.yaml", "name": "override", ... }

    The worker YAML provides ``name``, ``input_schema``, and ``system_prompt``.
    MCP config entries can override ``name`` and ``description``.

    Returns:
        List of tool definition dicts (one per worker entry).
    """
    tools: list[dict[str, Any]] = []

    for entry in worker_entries:
        config_path = entry["config"]
        try:
            cfg = load_config(config_path)
        except Exception as exc:
            logger.warning("mcp.discovery.worker_load_failed", path=config_path, error=str(exc))
            continue

        tool_name = entry.get("name", cfg.get("name", "unknown_worker"))
        description = (
            entry.get("description")
            or cfg.get("description")
            or _first_line(cfg.get("system_prompt", ""))
        )
        input_schema = cfg.get("input_schema", {"type": "object"})

        # Ensure the schema has the required top-level structure.
        if "type" not in input_schema:
            input_schema = {"type": "object", "properties": input_schema}

        # Stash metadata for the bridge to use when dispatching.
        tool = make_tool(tool_name, description, input_schema)
        tool["_loom"] = {
            "kind": "worker",
            "worker_type": cfg.get("name", tool_name),
            "tier": entry.get("tier", cfg.get("default_model_tier", "local")),
            "timeout": cfg.get("timeout_seconds", 60),
        }
        tools.append(tool)

    return tools

discover_pipeline_tools

discover_pipeline_tools(pipeline_entries: list[dict[str, Any]]) -> list[dict[str, Any]]

Generate MCP tool definitions from pipeline config entries.

The entry input schema is computed from the first stage's input_mapping: keys whose source path starts with goal.context. become the tool's input properties.

For example, input_mapping: {file_ref: "goal.context.file_ref"} produces inputSchema: {type: object, required: [file_ref], properties: {file_ref: {type: string}}}.

Source code in src/loom/mcp/discovery.py
def discover_pipeline_tools(
    pipeline_entries: list[dict[str, Any]],
) -> list[dict[str, Any]]:
    """Generate MCP tool definitions from pipeline config entries.

    The entry input schema is computed from the first stage's
    ``input_mapping``: keys whose source path starts with
    ``goal.context.`` become the tool's input properties.

    For example, ``input_mapping: {file_ref: "goal.context.file_ref"}``
    produces ``inputSchema: {type: object, required: [file_ref],
    properties: {file_ref: {type: string}}}``.
    """
    tools: list[dict[str, Any]] = []

    for entry in pipeline_entries:
        config_path = entry["config"]
        try:
            cfg = load_config(config_path)
        except Exception as exc:
            logger.warning("mcp.discovery.pipeline_load_failed", path=config_path, error=str(exc))
            continue

        tool_name = entry["name"]  # required by validation
        description = entry.get("description", f"Pipeline: {cfg.get('name', tool_name)}")

        # Derive input schema from first stage's input_mapping.
        stages = cfg.get("pipeline_stages", [])
        input_schema = _pipeline_entry_schema(stages, entry)

        tool = make_tool(tool_name, description, input_schema)
        tool["_loom"] = {
            "kind": "pipeline",
            "pipeline_name": cfg.get("name", tool_name),
            "timeout": cfg.get("timeout_seconds", 300),
        }
        tools.append(tool)

    return tools

discover_query_tools

discover_query_tools(query_entries: list[dict[str, Any]]) -> list[dict[str, Any]]

Generate MCP tool definitions from query backend entries.

Each query backend is instantiated and its _get_handlers() method is called to discover available actions. Per-action input schemas are generated from the backend's configuration (filter_fields, stats_groups, id_column, etc.).

Source code in src/loom/mcp/discovery.py
def discover_query_tools(
    query_entries: list[dict[str, Any]],
) -> list[dict[str, Any]]:
    """Generate MCP tool definitions from query backend entries.

    Each query backend is instantiated and its ``_get_handlers()`` method
    is called to discover available actions.  Per-action input schemas
    are generated from the backend's configuration (filter_fields,
    stats_groups, id_column, etc.).
    """
    tools: list[dict[str, Any]] = []

    for entry in query_entries:
        backend_path = entry["backend"]
        backend_config = entry.get("backend_config", {})
        requested_actions = entry["actions"]
        name_prefix = entry["name_prefix"]

        backend = _instantiate_backend(backend_path, backend_config)
        if backend is None:
            continue

        # Discover available actions.
        try:
            handlers = backend._get_handlers()
        except AttributeError:
            logger.warning(
                "mcp.discovery.query_no_handlers",
                backend=backend_path,
                hint="Backend must have _get_handlers() method",
            )
            continue

        available = set(handlers.keys())
        for action in requested_actions:
            if action not in available:
                logger.warning(
                    "mcp.discovery.query_unknown_action",
                    backend=backend_path,
                    action=action,
                    available=sorted(available),
                )
                continue

            tool_name = f"{name_prefix}_{action}"
            description = _query_action_description(action, backend)
            input_schema = _query_action_schema(action, backend)

            tool = make_tool(tool_name, description, input_schema)
            tool["_loom"] = {
                "kind": "query",
                "worker_type": entry.get("worker_type", f"{name_prefix}_query"),
                "action": action,
                "backend_path": backend_path,
                "backend_config": backend_config,
                "timeout": entry.get("timeout", 30),
            }
            tools.append(tool)

    return tools

Resources

resources

Workspace resource exposure for MCP.

Exposes files in a LOOM workspace directory as MCP resources with workspace:/// URIs. Tracks file modification times to detect changes after tool calls and emit MCP resource change notifications.

Also listens on the NATS loom.resources.changed subject for external change notifications from LOOM workers (future use).

WorkspaceResources

WorkspaceResources(workspace_dir: str | Path, patterns: list[str] | None = None)

Manages workspace files as MCP resources.

Scans the workspace directory and exposes matching files as resources with workspace:///filename URIs. Maintains an mtime cache to detect changes between tool calls.

Source code in src/loom/mcp/resources.py
def __init__(
    self,
    workspace_dir: str | Path,
    patterns: list[str] | None = None,
) -> None:
    self.workspace_dir = Path(workspace_dir)
    self.patterns = patterns  # None means all files.
    self._mtime_cache: dict[str, float] = {}  # relative_path -> mtime

list_resources

list_resources() -> list[dict[str, Any]]

Scan workspace and return MCP resource descriptors.

Returns:

Type Description
list[dict[str, Any]]

List of dicts matching mcp.types.Resource shape:

list[dict[str, Any]]

{uri, name, description, mimeType}.

Source code in src/loom/mcp/resources.py
def list_resources(self) -> list[dict[str, Any]]:
    """Scan workspace and return MCP resource descriptors.

    Returns:
        List of dicts matching ``mcp.types.Resource`` shape:
        ``{uri, name, description, mimeType}``.
    """
    resources: list[dict[str, Any]] = []

    if not self.workspace_dir.is_dir():
        return resources

    for path in sorted(self.workspace_dir.iterdir()):
        if path.is_dir():
            continue
        rel = path.name
        if not self._matches(rel):
            continue

        mime, _ = mimetypes.guess_type(rel)
        resources.append(
            {
                "uri": self._to_uri(rel),
                "name": rel,
                "description": f"Workspace file: {rel}",
                "mimeType": mime or "application/octet-stream",
            }
        )

    return resources

read_resource

read_resource(uri: str) -> tuple[str | bytes, str | None]

Read a workspace resource by URI.

Parameters:

Name Type Description Default
uri str

A workspace:///filename URI.

required

Returns:

Type Description
str | bytes

Tuple of (content, mimeType). Text files return string content;

str | None

binary files return raw bytes.

Raises:

Type Description
ValueError

If the URI scheme is wrong or the file is outside the workspace (path traversal).

FileNotFoundError

If the file does not exist.

Source code in src/loom/mcp/resources.py
def read_resource(self, uri: str) -> tuple[str | bytes, str | None]:
    """Read a workspace resource by URI.

    Args:
        uri: A ``workspace:///filename`` URI.

    Returns:
        Tuple of (content, mimeType).  Text files return string content;
        binary files return raw bytes.

    Raises:
        ValueError: If the URI scheme is wrong or the file is outside
            the workspace (path traversal).
        FileNotFoundError: If the file does not exist.
    """
    rel_path = self._from_uri(uri)
    full_path = self.workspace_dir / rel_path

    # Path traversal check.
    try:
        full_path.resolve().relative_to(self.workspace_dir.resolve())
    except ValueError as exc:
        raise ValueError(f"Path traversal detected: {uri}") from exc

    if not full_path.is_file():
        raise FileNotFoundError(f"Resource not found: {uri}")

    mime, _ = mimetypes.guess_type(rel_path)
    mime = mime or "application/octet-stream"

    if mime.startswith("text/") or mime in ("application/json", "application/xml"):
        return full_path.read_text(encoding="utf-8", errors="replace"), mime

    return full_path.read_bytes(), mime

detect_changes

detect_changes() -> list[str]

Detect workspace file changes since the last check.

Compares current mtimes against the cache. Returns a list of workspace:/// URIs for new or modified files.

Call this after each tool invocation to determine which resources changed.

Source code in src/loom/mcp/resources.py
def detect_changes(self) -> list[str]:
    """Detect workspace file changes since the last check.

    Compares current mtimes against the cache.  Returns a list of
    ``workspace:///`` URIs for new or modified files.

    Call this after each tool invocation to determine which
    resources changed.
    """
    changed: list[str] = []

    if not self.workspace_dir.is_dir():
        return changed

    current: dict[str, float] = {}
    for path in self.workspace_dir.iterdir():
        if path.is_dir():
            continue
        rel = path.name
        if not self._matches(rel):
            continue
        try:
            mtime = path.stat().st_mtime
        except OSError:
            continue
        current[rel] = mtime

        old_mtime = self._mtime_cache.get(rel)
        if old_mtime is None or mtime > old_mtime:
            changed.append(self._to_uri(rel))

    self._mtime_cache = current
    return changed

snapshot

snapshot() -> None

Take a snapshot of current mtimes (no change detection).

Source code in src/loom/mcp/resources.py
def snapshot(self) -> None:
    """Take a snapshot of current mtimes (no change detection)."""
    self.detect_changes()  # Side-effect: updates cache, ignore return.

Session Bridge

session_bridge

Session bridge — in-process dispatch for session management MCP tools.

Provides session lifecycle operations (start, end, status, sync-check, sync) as MCP tool actions. Operations execute git commands and file checks via subprocess — no NATS required.

Configuration is passed from the MCP gateway YAML tools.session section:

.. code-block:: yaml

tools:
  session:
    framework_dir: /path/to/framework
    workspace_dir: /path/to/baft/itp-workspace
    baft_dir: /path/to/baft
    nats_url: nats://localhost:4222
    ollama_url: http://localhost:11434
    enable: [start, end, status, sync_check, sync]

SessionBridgeError

Bases: Exception

Raised when a session bridge operation fails.

SessionBridge

SessionBridge(*, framework_dir: str | Path, workspace_dir: str | Path, baft_dir: str | Path | None = None, nats_url: str = 'nats://localhost:4222', ollama_url: str = 'http://localhost:11434')

Dispatch session management operations in-process.

Source code in src/loom/mcp/session_bridge.py
def __init__(
    self,
    *,
    framework_dir: str | Path,
    workspace_dir: str | Path,
    baft_dir: str | Path | None = None,
    nats_url: str = "nats://localhost:4222",
    ollama_url: str = "http://localhost:11434",
) -> None:
    self.framework_dir = Path(framework_dir)
    self.workspace_dir = Path(workspace_dir)
    self.baft_dir = Path(baft_dir) if baft_dir else None
    self.nats_url = nats_url
    self.ollama_url = ollama_url

dispatch async

dispatch(action: str, arguments: dict[str, Any]) -> dict[str, Any]

Route to the appropriate session handler.

Source code in src/loom/mcp/session_bridge.py
async def dispatch(self, action: str, arguments: dict[str, Any]) -> dict[str, Any]:
    """Route to the appropriate session handler."""
    handler_name = self._HANDLERS.get(action)
    if handler_name is None:
        raise SessionBridgeError(
            f"Unknown session action: {action}. Available: {sorted(self._HANDLERS)}"
        )
    handler = getattr(self, handler_name)
    return await handler(arguments)

Session Registry

session_registry

Lightweight session registry for MCP session tools.

File-based session markers in ~/.loom/sessions/. Compatible with baft.sessions — both read/write the same marker files.

get_active_sessions

get_active_sessions() -> list[dict[str, Any]]

Return active session dicts (non-stale markers).

Source code in src/loom/mcp/session_registry.py
def get_active_sessions() -> list[dict[str, Any]]:
    """Return active session dicts (non-stale markers)."""
    if not _SESSION_DIR.exists():
        return []

    now = time.time()
    active: list[dict[str, Any]] = []
    for marker in _SESSION_DIR.glob("*.json"):
        try:
            data = json.loads(marker.read_text())
            if now - data.get("last_active", 0) <= _STALE_THRESHOLD:
                active.append(data)
        except (json.JSONDecodeError, KeyError):
            continue
    # Sort by last_active descending so [0] is genuinely most recent.
    active.sort(key=lambda d: d.get("last_active", 0), reverse=True)
    return active

register_session

register_session(session_id: str, **kwargs: Any) -> None

Write or update a session marker.

Source code in src/loom/mcp/session_registry.py
def register_session(session_id: str, **kwargs: Any) -> None:
    """Write or update a session marker."""
    _SESSION_DIR.mkdir(parents=True, exist_ok=True)
    marker = {"session_id": session_id, "last_active": time.time(), **kwargs}
    (_SESSION_DIR / f"{session_id}.json").write_text(json.dumps(marker))

unregister_session

unregister_session(session_id: str) -> None

Remove a session marker.

Source code in src/loom/mcp/session_registry.py
def unregister_session(session_id: str) -> None:
    """Remove a session marker."""
    (_SESSION_DIR / f"{session_id}.json").unlink(missing_ok=True)