Skip to content

Contrib Modules

The loom.contrib package contains optional integrations that extend Loom's capabilities. Each module requires its own optional dependency extra.

Module Extra Purpose
contrib.duckdb duckdb Embedded analytics and vector search
contrib.lancedb lancedb ANN vector search via LanceDB
contrib.redis redis Production checkpoint persistence
contrib.rag rag Social media stream RAG pipeline

See RAG How-To for the RAG pipeline guide.

Valkey/Redis Store

Production checkpoint store using Redis/Valkey. Replaces the default in-memory store for persistent orchestrator checkpoints.

store

Valkey-backed checkpoint store.

Production implementation of CheckpointStore using redis.asyncio (redis-py). The redis-py client library works unchanged with Valkey. Install with: pip install loom[redis]

Connection defaults

redis://redis:6379 — matches the Docker Compose / k8s service name. For local dev: redis://localhost:6379

RedisCheckpointStore

RedisCheckpointStore(redis_url: str = 'redis://redis:6379')

Bases: CheckpointStore

Valkey-backed checkpoint store (via redis-py client).

Thin wrapper around redis.asyncio that implements the CheckpointStore interface. Handles connection lifecycle and TTL-based expiry natively. The redis-py client works unchanged with Valkey.

Source code in src/loom/contrib/redis/store.py
def __init__(self, redis_url: str = "redis://redis:6379") -> None:
    self._redis = redis.from_url(redis_url)

set async

set(key: str, value: str, ttl_seconds: int | None = None) -> None

Store a value with optional TTL.

Source code in src/loom/contrib/redis/store.py
async def set(self, key: str, value: str, ttl_seconds: int | None = None) -> None:
    """Store a value with optional TTL."""
    if ttl_seconds:
        await self._redis.set(key, value, ex=ttl_seconds)
    else:
        await self._redis.set(key, value)

get async

get(key: str) -> str | None

Retrieve a value by key.

Source code in src/loom/contrib/redis/store.py
async def get(self, key: str) -> str | None:
    """Retrieve a value by key."""
    result = await self._redis.get(key)
    if result is None:
        return None
    # redis.asyncio returns bytes by default
    if isinstance(result, bytes):
        return result.decode()
    return result

DuckDB Query Backend

Action-dispatch query backend for DuckDB. Supports full-text search, filtering, statistics, single-row get, and vector similarity search.

query_backend

Generic DuckDB query and analytics backend for Loom workflows.

Provides a configurable action-dispatch query backend against any DuckDB table. Supports full-text search (via DuckDB FTS), attribute filtering, aggregate statistics, single-record retrieval, and vector similarity search.

Subclasses configure domain-specific behavior by passing constructor parameters (table name, columns, filter definitions, etc.) rather than overriding methods. For advanced customisation, override _get_handlers to add or replace action handlers.

Example worker config::

processing_backend: "myapp.backends.MyQueryBackend"
backend_config:
  db_path: "/tmp/workspace/data.duckdb"
See Also

loom.worker.processor.SyncProcessingBackend -- base class for sync backends loom.contrib.duckdb.DuckDBViewTool -- LLM-callable view tool loom.contrib.duckdb.DuckDBVectorTool -- LLM-callable vector search tool

DuckDBQueryError

Bases: BackendError

Raised when a DuckDB query operation fails.

Wraps underlying DuckDB exceptions with a descriptive message and the original cause attached via __cause__.

DuckDBQueryBackend

DuckDBQueryBackend(db_path: str = '/tmp/workspace/data.duckdb', *, table_name: str = 'documents', result_columns: list[str] | None = None, json_columns: set[str] | None = None, id_column: str = 'id', full_text_column: str | None = 'full_text', fts_fields: str = 'full_text,summary', filter_fields: dict[str, str] | None = None, stats_groups: set[str] | None = None, stats_aggregates: list[str] | None = None, default_order_by: str = 'rowid', embedding_column: str = 'embedding')

Bases: SyncProcessingBackend

Generic action-dispatch query backend for DuckDB tables.

Opens a read-only connection to the DuckDB database and dispatches to the appropriate query handler based on the action field in the payload.

All queries use parameterized statements to prevent SQL injection. Results from search/filter actions exclude large content columns (configurable via full_text_column) to keep messages small.

Parameters:

Name Type Description Default
db_path str

Path to the DuckDB database file.

'/tmp/workspace/data.duckdb'
table_name str

Table to query.

'documents'
result_columns list[str] | None

Columns returned in search/filter results.

None
json_columns set[str] | None

Set of column names containing JSON strings that should be parsed back into Python objects on read.

None
id_column str

Primary key column name for the get action.

'id'
full_text_column str | None

Large content column included only in get results. Set to None if no such column exists.

'full_text'
fts_fields str

Comma-separated field names for DuckDB FTS match_bm25 calls (e.g. "content,summary").

'full_text,summary'
filter_fields dict[str, str] | None

Mapping of payload field names to SQL condition templates. Example: {"min_pages": "page_count >= ?"}. Each key is checked in the payload; if present, its SQL template is added to the WHERE clause.

None
stats_groups set[str] | None

Set of column names allowed as group_by values for the stats action.

None
stats_aggregates list[str] | None

SQL aggregate expressions for the stats query. Defaults to ["COUNT(*) AS record_count"].

None
default_order_by str

ORDER BY clause for filter results.

'rowid'
embedding_column str

Column name for vector embeddings used in the vector_search action.

'embedding'
Source code in src/loom/contrib/duckdb/query_backend.py
def __init__(
    self,
    db_path: str = "/tmp/workspace/data.duckdb",
    *,
    table_name: str = "documents",
    result_columns: list[str] | None = None,
    json_columns: set[str] | None = None,
    id_column: str = "id",
    full_text_column: str | None = "full_text",
    fts_fields: str = "full_text,summary",
    filter_fields: dict[str, str] | None = None,
    stats_groups: set[str] | None = None,
    stats_aggregates: list[str] | None = None,
    default_order_by: str = "rowid",
    embedding_column: str = "embedding",
) -> None:
    self.db_path = Path(db_path)
    self.table_name = table_name
    self.result_columns = result_columns or ["id"]
    self.json_columns = json_columns or set()
    self.id_column = id_column
    self.full_text_column = full_text_column
    self.fts_fields = fts_fields
    self.filter_fields = filter_fields or {}
    self.stats_groups = stats_groups or set()
    self.stats_aggregates = stats_aggregates or ["COUNT(*) AS record_count"]
    self.default_order_by = default_order_by
    self.embedding_column = embedding_column

process_sync

process_sync(payload: dict[str, Any], config: dict[str, Any]) -> dict[str, Any]

Dispatch a query action against the DuckDB database.

Parameters:

Name Type Description Default
payload dict[str, Any]

Must contain action (str). Additional fields depend on the action type.

required
config dict[str, Any]

Worker config dict. May include db_path to override the constructor default.

required

Returns:

Type Description
dict[str, Any]

A dict with "output" (query results) and

dict[str, Any]

"model_used" (always "duckdb").

Raises:

Type Description
ValueError

If the action is unknown.

DuckDBQueryError

If the database query fails.

Source code in src/loom/contrib/duckdb/query_backend.py
def process_sync(self, payload: dict[str, Any], config: dict[str, Any]) -> dict[str, Any]:
    """Dispatch a query action against the DuckDB database.

    Args:
        payload: Must contain ``action`` (str). Additional fields
            depend on the action type.
        config: Worker config dict. May include ``db_path`` to
            override the constructor default.

    Returns:
        A dict with ``"output"`` (query results) and
        ``"model_used"`` (always ``"duckdb"``).

    Raises:
        ValueError: If the action is unknown.
        DuckDBQueryError: If the database query fails.
    """
    db_path = config.get("db_path", str(self.db_path))
    action = payload.get("action", "")

    handlers = self._get_handlers()

    handler = handlers.get(action)
    if not handler:
        raise ValueError(f"Unknown action '{action}'. Supported: {', '.join(handlers.keys())}")

    try:
        conn = duckdb.connect(db_path, read_only=True)
        try:
            # Load FTS extension for search queries.
            conn.execute("LOAD fts")
            result = handler(conn, payload)
        finally:
            conn.close()
    except (ValueError, DuckDBQueryError):
        raise
    except Exception as exc:
        raise DuckDBQueryError(f"Query failed (action={action}): {exc}") from exc

    return {"output": result, "model_used": "duckdb"}

DuckDB View Tool

Read-only DuckDB view exposed as an LLM-callable tool. Workers can query structured data during processing.

view_tool

DuckDB view tool — exposes a DuckDB view as an LLM-callable tool.

When configured in a worker's knowledge_silos, this tool lets the LLM query a read-only DuckDB view during reasoning. The LLM can search (full-text) or list records from the view.

Example knowledge_silos config::

knowledge_silos:
  - name: "catalog"
    type: "tool"
    provider: "loom.contrib.duckdb.DuckDBViewTool"
    config:
      db_path: "/tmp/workspace/data.duckdb"
      view_name: "summaries"
      description: "Search and browse record summaries"
      max_results: 20

The tool auto-introspects the view's columns via DESCRIBE to build its JSON Schema definition. Queries use parameterized SQL to prevent injection.

DuckDBViewTool

DuckDBViewTool(db_path: str, view_name: str, description: str = 'Query a database view', max_results: int = 20)

Bases: SyncToolProvider

Expose a DuckDB view as an LLM-callable search/list tool.

The tool dynamically introspects the view's column schema at instantiation time and builds a JSON Schema tool definition that the LLM can call.

Supports two operations
  • search: Full-text ILIKE search across all text columns
  • list: List recent records with optional column filters

All queries are parameterized and results are capped at max_results.

Source code in src/loom/contrib/duckdb/view_tool.py
def __init__(
    self,
    db_path: str,
    view_name: str,
    description: str = "Query a database view",
    max_results: int = 20,
) -> None:
    self.db_path = db_path
    self.view_name = view_name
    self.description = description
    self.max_results = max_results
    self._columns: list[dict[str, str]] = []
    self._introspect()

get_definition

get_definition() -> dict[str, Any]

Build JSON Schema tool definition from view columns.

Source code in src/loom/contrib/duckdb/view_tool.py
def get_definition(self) -> dict[str, Any]:
    """Build JSON Schema tool definition from view columns."""
    # Build filterable columns (non-text columns that might be useful for filtering)
    filter_props: dict[str, Any] = {}
    for col in self._columns:
        col_type = col["type"].upper()
        if col_type in ("VARCHAR", "TEXT"):
            filter_props[col["name"]] = {"type": "string"}
        elif col_type in ("INTEGER", "BIGINT", "SMALLINT", "TINYINT"):
            filter_props[col["name"]] = {"type": "integer"}
        elif col_type in ("DOUBLE", "FLOAT", "DECIMAL"):
            filter_props[col["name"]] = {"type": "number"}
        elif col_type == "BOOLEAN":
            filter_props[col["name"]] = {"type": "boolean"}

    return {
        "name": f"query_{self.view_name}",
        "description": self.description,
        "parameters": {
            "type": "object",
            "properties": {
                "operation": {
                    "type": "string",
                    "enum": ["search", "list"],
                    "description": "search: full-text search; list: browse records",
                },
                "query": {
                    "type": "string",
                    "description": "Search query (for search operation)",
                },
                "limit": {
                    "type": "integer",
                    "description": (
                        f"Max results to return (default: 10, max: {self.max_results})"
                    ),
                },
                "filters": {
                    "type": "object",
                    "description": "Column filters (for list operation)",
                    "properties": filter_props,
                },
            },
            "required": ["operation"],
        },
    }

execute_sync

execute_sync(arguments: dict[str, Any]) -> str

Execute a query against the DuckDB view.

Source code in src/loom/contrib/duckdb/view_tool.py
def execute_sync(self, arguments: dict[str, Any]) -> str:
    """Execute a query against the DuckDB view."""
    operation = arguments.get("operation", "list")
    limit = min(arguments.get("limit", 10), self.max_results)

    try:
        conn = duckdb.connect(self.db_path, read_only=True)
        try:
            if operation == "search":
                result = self._search(conn, arguments, limit)
            else:
                result = self._list(conn, arguments, limit)
        finally:
            conn.close()
    except Exception as e:
        return json.dumps({"error": str(e)})

    return json.dumps(result, default=str)

DuckDB Vector Tool

Semantic similarity search via DuckDB embeddings, exposed as an LLM tool.

vector_tool

DuckDB vector similarity search tool for LLM function-calling.

Uses embedding vectors stored in DuckDB to find semantically similar records. Query text is embedded via Ollama at search time, then compared against stored vectors using DuckDB's list_cosine_similarity.

Example knowledge_silos config::

knowledge_silos:
  - name: "similar_items"
    type: "tool"
    provider: "loom.contrib.duckdb.DuckDBVectorTool"
    config:
      db_path: "/tmp/workspace/data.duckdb"
      table_name: "documents"
      result_columns: ["id", "title", "summary", "created_at"]
      embedding_column: "embedding"
      tool_name: "find_similar"
      description: "Find records semantically similar to a query"
      embedding_model: "nomic-embed-text"
See Also

loom.worker.embeddings -- OllamaEmbeddingProvider loom.worker.tools -- SyncToolProvider base class

DuckDBVectorTool

DuckDBVectorTool(db_path: str, table_name: str = 'documents', result_columns: list[str] | None = None, embedding_column: str = 'embedding', tool_name: str = 'find_similar', description: str = 'Find semantically similar records', embedding_model: str = 'nomic-embed-text', ollama_url: str | None = None, max_results: int = 10)

Bases: SyncToolProvider

Semantic similarity search over DuckDB vector embeddings.

Generates a query embedding via Ollama, then uses DuckDB's list_cosine_similarity function to find the most similar records by their stored embedding vectors.

Only records with non-null embeddings are searched.

Parameters:

Name Type Description Default
db_path str

Path to the DuckDB database file.

required
table_name str

Table containing the records and embeddings.

'documents'
result_columns list[str] | None

Columns to include in results. If None, introspects the table schema at first use, excluding the embedding column and any column named full_text.

None
embedding_column str

Name of the column storing embedding vectors.

'embedding'
tool_name str

Name exposed in the LLM tool definition.

'find_similar'
description str

Description exposed in the LLM tool definition.

'Find semantically similar records'
embedding_model str

Ollama model name for embedding generation.

'nomic-embed-text'
ollama_url str | None

Optional custom Ollama server URL.

None
max_results int

Hard cap on returned results.

10
Source code in src/loom/contrib/duckdb/vector_tool.py
def __init__(
    self,
    db_path: str,
    table_name: str = "documents",
    result_columns: list[str] | None = None,
    embedding_column: str = "embedding",
    tool_name: str = "find_similar",
    description: str = "Find semantically similar records",
    embedding_model: str = "nomic-embed-text",
    ollama_url: str | None = None,
    max_results: int = 10,
) -> None:
    self.db_path = db_path
    self.table_name = table_name
    self._result_columns = result_columns
    self.embedding_column = embedding_column
    self.tool_name = tool_name
    self.description = description
    self.embedding_model = embedding_model
    self.ollama_url = ollama_url
    self.max_results = max_results

result_columns property

result_columns: list[str]

Return result columns, introspecting on first access if needed.

get_definition

get_definition() -> dict[str, Any]

Return tool definition for LLM function-calling.

Source code in src/loom/contrib/duckdb/vector_tool.py
def get_definition(self) -> dict[str, Any]:
    """Return tool definition for LLM function-calling."""
    return {
        "name": self.tool_name,
        "description": self.description,
        "parameters": {
            "type": "object",
            "properties": {
                "query": {
                    "type": "string",
                    "description": "Natural language query to find similar records",
                },
                "limit": {
                    "type": "integer",
                    "description": f"Max results (default: 5, max: {self.max_results})",
                },
            },
            "required": ["query"],
        },
    }

execute_sync

execute_sync(arguments: dict[str, Any]) -> str

Embed the query and search for similar records.

Source code in src/loom/contrib/duckdb/vector_tool.py
def execute_sync(self, arguments: dict[str, Any]) -> str:
    """Embed the query and search for similar records."""
    query = arguments.get("query", "")
    limit = min(arguments.get("limit", 5), self.max_results)

    if not query.strip():
        return json.dumps({"results": [], "total": 0})

    # Generate query embedding via Ollama.
    query_embedding = self._embed_query(query)
    if query_embedding is None:
        return json.dumps({"error": "Failed to generate query embedding"})

    try:
        conn = duckdb.connect(self.db_path, read_only=True)
        try:
            result = self._similarity_search(conn, query_embedding, limit)
        finally:
            conn.close()
    except Exception as e:
        return json.dumps({"error": str(e)})

    return json.dumps(result, default=str)

LanceDB Vector Store

ANN vector storage and search via LanceDB. Faster than DuckDB for large datasets. Implements the VectorStore ABC.

store

LanceDB-backed vector store for embedded text chunks.

Stores EmbeddedChunk records in a LanceDB table with native vector columns. Supports: - Batch insertion of TextChunk objects (with embedding generation) - Pre-embedded chunk insertion - Approximate Nearest Neighbor (ANN) similarity search - Metadata filtering (e.g. by channel_id) - Basic CRUD (get, delete by chunk_id)

Uses Loom's OllamaEmbeddingProvider for query embedding generation.

LanceDB provides ANN indexing for faster search over large datasets compared to exact cosine similarity in DuckDB.

LanceDBVectorStore

LanceDBVectorStore(db_path: str = '/tmp/rag-vectors.lance', embedding_model: str = 'nomic-embed-text', ollama_url: str = 'http://localhost:11434')

Bases: VectorStore

Embedded vector store backed by LanceDB.

Usage::

store = LanceDBVectorStore("/tmp/rag-vectors.lance")
store.initialize()

# Embed and store chunks
store.add_chunks(chunks)

# Search
results = store.search("earthquake damage", limit=5)

store.close()
Source code in src/loom/contrib/lancedb/store.py
def __init__(
    self,
    db_path: str = "/tmp/rag-vectors.lance",
    embedding_model: str = "nomic-embed-text",
    ollama_url: str = "http://localhost:11434",
) -> None:
    self.db_path = Path(db_path)
    self.embedding_model = embedding_model
    self.ollama_url = ollama_url
    self._db: Any = None
    self._table: Any = None
    self._embedding_dim: int | None = None

initialize

initialize() -> LanceDBVectorStore

Open or create the LanceDB database and table.

Source code in src/loom/contrib/lancedb/store.py
def initialize(self) -> LanceDBVectorStore:
    """Open or create the LanceDB database and table."""
    import lancedb

    self.db_path.parent.mkdir(parents=True, exist_ok=True)
    self._db = lancedb.connect(str(self.db_path))

    # Check if table already exists
    if self.TABLE_NAME in self._db.list_tables():
        self._table = self._db.open_table(self.TABLE_NAME)
    else:
        self._table = None  # Created on first insert (need schema from data)

    logger.info("Initialized LanceDB vector store at %s", self.db_path)
    return self

close

close() -> None

Close the database connection.

Source code in src/loom/contrib/lancedb/store.py
def close(self) -> None:
    """Close the database connection."""
    self._table = None
    self._db = None

add_chunks

add_chunks(chunks: list[TextChunk], batch_size: int = 64) -> int

Embed and insert TextChunk objects. Returns count of inserted rows.

Source code in src/loom/contrib/lancedb/store.py
def add_chunks(  # pragma: no cover
    self,
    chunks: list[TextChunk],
    batch_size: int = 64,
) -> int:
    """Embed and insert TextChunk objects. Returns count of inserted rows."""
    if not chunks:
        return 0

    total_inserted = 0
    for i in range(0, len(chunks), batch_size):
        batch = chunks[i : i + batch_size]
        texts = [c.text for c in batch]

        try:
            embeddings = self._embed_texts(texts)
        except Exception as exc:
            logger.error("Embedding batch %d failed: %s", i // batch_size, exc)
            continue

        records = []
        for chunk, emb in zip(batch, embeddings, strict=False):
            records.append(
                {
                    "chunk_id": chunk.chunk_id,
                    "source_global_id": chunk.source_global_id,
                    "source_channel_id": chunk.source_channel_id,
                    "source_channel_name": chunk.source_channel_name,
                    "text": chunk.text,
                    "char_start": chunk.char_start,
                    "char_end": chunk.char_end,
                    "chunk_index": chunk.chunk_index,
                    "total_chunks": chunk.total_chunks,
                    "strategy": chunk.strategy.value
                    if hasattr(chunk.strategy, "value")
                    else str(chunk.strategy),
                    "timestamp_unix": chunk.timestamp_unix,
                    "vector": emb,
                    "embedding_model": self.embedding_model,
                    "embedding_dim": len(emb),
                }
            )

        try:
            created = self._ensure_table(records)
            if not created and self._table is not None and records:
                self._table.add(records)
            total_inserted += len(records)
        except Exception as exc:
            logger.warning("Insert batch %d failed: %s", i // batch_size, exc)

    logger.info("Inserted %d / %d chunks into %s", total_inserted, len(chunks), self.TABLE_NAME)
    return total_inserted

add_embedded_chunks

add_embedded_chunks(chunks: list[EmbeddedChunk]) -> int

Insert pre-embedded chunks (no embedding generation needed).

Source code in src/loom/contrib/lancedb/store.py
def add_embedded_chunks(self, chunks: list[EmbeddedChunk]) -> int:
    """Insert pre-embedded chunks (no embedding generation needed)."""
    if not chunks:
        return 0

    records = [
        {
            "chunk_id": ec.chunk_id,
            "source_global_id": ec.source_global_id,
            "source_channel_id": ec.source_channel_id,
            "text": ec.text,
            "vector": ec.embedding,
            "embedding_model": ec.model,
            "embedding_dim": ec.dimensions,
            "source_channel_name": "",
            "char_start": 0,
            "char_end": 0,
            "chunk_index": 0,
            "total_chunks": 1,
            "strategy": "sentence",
            "timestamp_unix": 0,
        }
        for ec in chunks
    ]

    try:
        created = self._ensure_table(records)
        if not created and self._table is not None:
            self._table.add(records)
        return len(records)
    except Exception as exc:
        logger.warning("Insert pre-embedded chunks failed: %s", exc)
        return 0

search

search(query: str, limit: int = 10, min_score: float = 0.0, channel_ids: list[int] | None = None) -> list[SimilarityResult]

Semantic similarity search using LanceDB ANN.

Parameters:

Name Type Description Default
query str

Natural language query (embedded via Ollama)

required
limit int

Maximum results to return

10
min_score float

Minimum cosine similarity threshold

0.0
channel_ids list[int] | None

Optional filter by source channel

None

Returns:

Type Description
list[SimilarityResult]

List of SimilarityResult sorted by descending similarity

Source code in src/loom/contrib/lancedb/store.py
def search(  # pragma: no cover
    self,
    query: str,
    limit: int = 10,
    min_score: float = 0.0,
    channel_ids: list[int] | None = None,
) -> list[SimilarityResult]:
    """
    Semantic similarity search using LanceDB ANN.

    Args:
        query:       Natural language query (embedded via Ollama)
        limit:       Maximum results to return
        min_score:   Minimum cosine similarity threshold
        channel_ids: Optional filter by source channel

    Returns:
        List of SimilarityResult sorted by descending similarity
    """
    if self._table is None:
        return []

    embeddings = self._embed_texts([query])
    if not embeddings:
        return []

    query_emb = embeddings[0]

    search_query = self._table.search(query_emb, vector_column_name="vector").limit(limit)

    if channel_ids:
        filter_expr = " OR ".join(f"source_channel_id = {cid}" for cid in channel_ids)
        search_query = search_query.where(f"({filter_expr})")

    try:
        raw_results = search_query.to_list()
    except Exception as exc:
        logger.error("LanceDB search failed: %s", exc)
        return []

    results: list[SimilarityResult] = []
    for row in raw_results:
        # LanceDB returns _distance (L2) by default; for cosine metric
        # it returns 1 - cosine_similarity, so score = 1 - _distance
        distance = row.get("_distance", 1.0)
        score = max(0.0, 1.0 - distance)

        if score < min_score:
            continue

        results.append(
            SimilarityResult(
                chunk_id=row["chunk_id"],
                text=row["text"],
                score=score,
                source_channel_id=row["source_channel_id"],
                source_global_id=row["source_global_id"],
                metadata={
                    "source_channel_name": row.get("source_channel_name", ""),
                    "timestamp_unix": row.get("timestamp_unix", 0),
                    "strategy": row.get("strategy", ""),
                },
            )
        )

    return results

count

count() -> int

Return total number of stored chunks.

Source code in src/loom/contrib/lancedb/store.py
def count(self) -> int:
    """Return total number of stored chunks."""
    if self._table is None:
        return 0
    return self._table.count_rows()

get

get(chunk_id: str) -> EmbeddedChunk | None

Retrieve a single embedded chunk by ID.

Source code in src/loom/contrib/lancedb/store.py
def get(self, chunk_id: str) -> EmbeddedChunk | None:
    """Retrieve a single embedded chunk by ID."""
    if self._table is None:
        return None

    try:
        results = self._table.search().where(f"chunk_id = '{chunk_id}'").limit(1).to_list()
    except Exception:
        return None

    if not results:
        return None

    row = results[0]
    return EmbeddedChunk(
        chunk_id=row["chunk_id"],
        source_global_id=row["source_global_id"],
        source_channel_id=row["source_channel_id"],
        text=row["text"],
        embedding=list(row.get("vector", [])),
        model=row.get("embedding_model", ""),
        dimensions=row.get("embedding_dim", 0),
    )

delete

delete(chunk_id: str) -> bool

Delete a chunk by ID. Returns True if a row was deleted.

Source code in src/loom/contrib/lancedb/store.py
def delete(self, chunk_id: str) -> bool:
    """Delete a chunk by ID. Returns True if a row was deleted."""
    if self._table is None:
        return False

    before = self._table.count_rows()
    try:
        self._table.delete(f"chunk_id = '{chunk_id}'")
    except Exception as exc:
        logger.warning("Delete failed for chunk %s: %s", chunk_id, exc)
        return False
    return self._table.count_rows() < before

delete_by_source

delete_by_source(source_global_id: str) -> int

Delete all chunks for a given source post. Returns count.

Source code in src/loom/contrib/lancedb/store.py
def delete_by_source(self, source_global_id: str) -> int:
    """Delete all chunks for a given source post. Returns count."""
    if self._table is None:
        return 0

    before = self._table.count_rows()
    try:
        self._table.delete(f"source_global_id = '{source_global_id}'")
    except Exception as exc:
        logger.warning("Delete by source failed for %s: %s", source_global_id, exc)
        return 0
    return before - self._table.count_rows()

stats

stats() -> dict[str, Any]

Return summary statistics about the store.

Source code in src/loom/contrib/lancedb/store.py
def stats(self) -> dict[str, Any]:
    """Return summary statistics about the store."""
    if self._table is None:
        return {"total_chunks": 0}

    try:
        total = self._table.count_rows()
        if total == 0:
            return {"total_chunks": 0}

        # Get basic stats via pandas for aggregate queries
        df = self._table.to_pandas()
        return {
            "total_chunks": total,
            "unique_posts": df["source_global_id"].nunique(),
            "unique_channels": df["source_channel_id"].nunique(),
            "earliest_timestamp": int(df["timestamp_unix"].min()),
            "latest_timestamp": int(df["timestamp_unix"].max()),
            "db_path": str(self.db_path),
        }
    except Exception as exc:
        logger.warning("Stats query failed: %s", exc)
        return {"total_chunks": self.count(), "db_path": str(self.db_path)}

LanceDB Vector Tool

Semantic similarity search via LanceDB, exposed as an LLM tool.

tool

LanceDB vector similarity search tool for LLM function-calling.

Uses embedding vectors stored in LanceDB to find semantically similar records. Query text is embedded via Ollama at search time, then compared against stored vectors using LanceDB's ANN search.

Example knowledge_silos config::

knowledge_silos:
  - name: "similar_items"
    type: "tool"
    provider: "loom.contrib.lancedb.LanceDBVectorTool"
    config:
      db_path: "/tmp/workspace/rag-vectors.lance"
      table_name: "rag_chunks"
      tool_name: "find_similar"
      description: "Find records semantically similar to a query"
      embedding_model: "nomic-embed-text"
See Also

loom.worker.embeddings -- OllamaEmbeddingProvider loom.worker.tools -- SyncToolProvider base class

LanceDBVectorTool

LanceDBVectorTool(db_path: str, table_name: str = 'rag_chunks', vector_column: str = 'vector', result_columns: list[str] | None = None, tool_name: str = 'find_similar', description: str = 'Find semantically similar records', embedding_model: str = 'nomic-embed-text', ollama_url: str | None = None, max_results: int = 10)

Bases: SyncToolProvider

Semantic similarity search over LanceDB vector embeddings.

Generates a query embedding via Ollama, then uses LanceDB's ANN search to find the most similar records by their stored vectors.

Parameters:

Name Type Description Default
db_path str

Path to the LanceDB database directory.

required
table_name str

Table containing the records and embeddings.

'rag_chunks'
vector_column str

Name of the column storing embedding vectors.

'vector'
result_columns list[str] | None

Columns to include in results. If None, returns chunk_id, text, source_channel_id, source_global_id.

None
tool_name str

Name exposed in the LLM tool definition.

'find_similar'
description str

Description exposed in the LLM tool definition.

'Find semantically similar records'
embedding_model str

Ollama model name for embedding generation.

'nomic-embed-text'
ollama_url str | None

Optional custom Ollama server URL.

None
max_results int

Hard cap on returned results.

10
Source code in src/loom/contrib/lancedb/tool.py
def __init__(
    self,
    db_path: str,
    table_name: str = "rag_chunks",
    vector_column: str = "vector",
    result_columns: list[str] | None = None,
    tool_name: str = "find_similar",
    description: str = "Find semantically similar records",
    embedding_model: str = "nomic-embed-text",
    ollama_url: str | None = None,
    max_results: int = 10,
) -> None:
    self.db_path = db_path
    self.table_name = table_name
    self.vector_column = vector_column
    self._result_columns = result_columns or [
        "chunk_id",
        "text",
        "source_channel_id",
        "source_global_id",
    ]
    self.tool_name = tool_name
    self.description = description
    self.embedding_model = embedding_model
    self.ollama_url = ollama_url
    self.max_results = max_results

get_definition

get_definition() -> dict[str, Any]

Return tool definition for LLM function-calling.

Source code in src/loom/contrib/lancedb/tool.py
def get_definition(self) -> dict[str, Any]:
    """Return tool definition for LLM function-calling."""
    return {
        "name": self.tool_name,
        "description": self.description,
        "parameters": {
            "type": "object",
            "properties": {
                "query": {
                    "type": "string",
                    "description": "Natural language query to find similar records",
                },
                "limit": {
                    "type": "integer",
                    "description": f"Max results (default: 5, max: {self.max_results})",
                },
            },
            "required": ["query"],
        },
    }

execute_sync

execute_sync(arguments: dict[str, Any]) -> str

Embed the query and search for similar records.

Source code in src/loom/contrib/lancedb/tool.py
def execute_sync(self, arguments: dict[str, Any]) -> str:  # pragma: no cover
    """Embed the query and search for similar records."""
    query = arguments.get("query", "")
    limit = min(arguments.get("limit", 5), self.max_results)

    if not query.strip():
        return json.dumps({"results": [], "total": 0})

    query_embedding = self._embed_query(query)
    if query_embedding is None:
        return json.dumps({"error": "Failed to generate query embedding"})

    try:
        import lancedb

        db = lancedb.connect(self.db_path)
        if self.table_name not in db.table_names():
            return json.dumps({"results": [], "total": 0})

        table = db.open_table(self.table_name)
        raw_results = (
            table.search(query_embedding, vector_column_name=self.vector_column)
            .limit(limit)
            .to_list()
        )

        results = []
        for row in raw_results:
            record = {}
            for col in self._result_columns:
                if col in row:
                    record[col] = row[col]
            distance = row.get("_distance", 1.0)
            record["similarity"] = max(0.0, 1.0 - distance)
            results.append(record)

        return json.dumps({"results": results, "total": len(results)}, default=str)

    except Exception as e:
        return json.dumps({"error": str(e)})