RAG Pipeline for Social Media Stream Analysis¶
Overview¶
loom.contrib.rag is a contrib module for processing social media streams (currently Telegram channels) through a multi-stage pipeline: ingest, normalize, multiplex, chunk, embed, and analyze.
It is designed for Persian/RTL text but works with any language.
Quick Start (CLI)¶
The fastest way to use the RAG pipeline — no Python code needed:
# 1. Install and configure
uv sync --extra rag
uv run loom setup # interactive wizard
# 2. Ingest Telegram exports
uv run loom rag ingest /path/to/exports/result*.json
# 3. Search
uv run loom rag search "earthquake damage reports" --limit 10
# 4. Check statistics
uv run loom rag stats
# 5. Open the web dashboard
uv run loom rag serve --port 8080
Use --store lancedb for ANN search on larger datasets:
See the CLI Reference for all options.
Installation¶
uv sync --extra rag # DuckDB vector store backend
uv sync --extra lancedb # LanceDB vector store backend (ANN search)
uv sync --extra telegram # Live Telegram capture via Telethon
Dependencies added by [rag]: duckdb>=1.0.0, requests>=2.31.0.
Dependencies added by [lancedb]: lancedb>=0.15.0, pyarrow>=15.0.0.
Dependencies added by [telegram]: telethon>=1.36.0.
For embedding generation you also need a running Ollama server with nomic-embed-text (or another model):
Architecture¶
┌── Telegram JSON ──► TelegramIngestor ──┐
│ ├──► NormalizedPost[]
└── Telegram Live ──► TelegramLiveIngestor┘
(Telethon) │
┌──────────────────────────┘
▼
StreamMux ──────────────► MuxedStream
(chronological merge (entries sorted by timestamp,
+ time windowing) assigned to 6h windows)
│
▼
SentenceChunker ──────────► TextChunk[]
│
┌───────┴───────┐
▼ ▼
VectorStore (ABC) LLM Analysis Actors
├─ DuckDBVectorStore (TrendAnalyzer, CorroborationFinder,
└─ LanceDBVectorStore AnomalyDetector, DataExtractor)
All ingestors extend the Ingestor ABC (ingestion/base.py). All vector stores
extend the VectorStore ABC (vectorstore/base.py). Backends are configurable
via ingestor_class and store_class parameters.
Quick Start (Python API)¶
For programmatic access — use the RAG classes directly without CLI or infrastructure:
from datetime import timedelta
from loom.contrib.rag.ingestion.telegram_ingestor import TelegramIngestor
from loom.contrib.rag.mux.stream_mux import merge_from_ingestors
from loom.contrib.rag.schemas.mux import MuxWindowConfig
from loom.contrib.rag.chunker.sentence_chunker import ChunkConfig, chunk_mux_entry
from loom.contrib.rag.vectorstore.duckdb_store import DuckDBVectorStore
# 1. Ingest
ingestors = [
TelegramIngestor("exports/channel_1.json", min_text_len=10).load(),
TelegramIngestor("exports/channel_2.json", min_text_len=10).load(),
]
# 2. Multiplex
window_config = MuxWindowConfig(window_duration=timedelta(hours=6))
stream = merge_from_ingestors(ingestors, window_config=window_config)
print(f"{stream.total_entries} posts in {len(stream.window_ids)} windows")
# 3. Chunk
cfg = ChunkConfig(target_chars=400, max_chars=600)
chunks = []
for entry in stream.entries:
chunks.extend(chunk_mux_entry(entry, config=cfg))
# 4. Store + Embed
store = DuckDBVectorStore(
db_path="/tmp/my-rag.duckdb",
embedding_model="nomic-embed-text",
).initialize()
stored = store.add_chunks(chunks) # embeds via Ollama
print(f"Stored {stored} chunks")
# 5. Search
results = store.search("earthquake damage reports", limit=5)
for r in results:
print(f" [{r.score:.3f}] {r.text[:80]}...")
store.close()
Module Reference¶
Schemas (loom.contrib.rag.schemas)¶
All data models are Pydantic v2 BaseModels.
| Schema | Module | Purpose |
|---|---|---|
NormalizedPost |
schemas.post |
Canonical post representation across all sources |
Language |
schemas.post |
Enum: fa, ar, en, mixed, unknown |
ChannelBias |
schemas.post |
Enum: state_media, independent, fact_check, etc. |
ChannelEditorProfile |
schemas.post |
Trust weight + bias classification per channel |
RawTelegramMessage |
schemas.telegram |
Raw Telegram JSON message (polymorphic text field) |
TelegramChannel |
schemas.telegram |
Telegram channel container |
MuxWindowConfig |
schemas.mux |
Tumbling/sliding window configuration |
MuxEntry |
schemas.mux |
Single entry in a muxed stream (wraps NormalizedPost) |
MuxedStream |
schemas.mux |
Complete multiplexed stream with window assignments |
TextChunk |
schemas.chunk |
Chunked text segment with provenance |
ChunkStrategy |
schemas.chunk |
Enum: sentence, paragraph, whole_post, fixed_char |
EmbeddedChunk |
schemas.embedding |
Chunk with embedding vector |
SimilarityResult |
schemas.embedding |
Search result with cosine similarity score |
TrendSignal |
schemas.analysis |
Detected trend across channels |
CorroborationMatch |
schemas.analysis |
Cross-channel corroboration |
AnomalyFlag |
schemas.analysis |
Statistical or semantic anomaly |
ExtractedData |
schemas.analysis |
Structured data extraction results |
Ingestion (loom.contrib.rag.ingestion)¶
Ingestor ABC — All ingestors extend Ingestor from ingestion/base.py:
from loom.contrib.rag.ingestion.base import Ingestor
class MyIngestor(Ingestor):
def load(self) -> "MyIngestor": ...
def ingest(self) -> Generator[NormalizedPost, None, None]: ...
# ingest_all() is provided by the base class
TelegramIngestor — Parses Telegram Desktop JSON exports.
from loom.contrib.rag.ingestion.telegram_ingestor import TelegramIngestor
ingestor = TelegramIngestor(
source_path="result.json",
min_text_len=10, # skip messages shorter than this
).load()
# Properties
ingestor.channel_id # int
ingestor.channel_name # str
# Get all posts
posts = ingestor.ingest_all() # list[NormalizedPost]
Handles:
- Polymorphic text fields (string or list of text/entity objects)
- Reaction counts (emoji + paid)
- Forward detection
- Service message filtering
- Media type detection
DEFAULT_PROFILES — Pre-configured channel bias profiles:
from loom.contrib.rag.ingestion.telegram_ingestor import DEFAULT_PROFILES
# FarsNews: state_media, trust_weight=0.3
# Iranwire: independent, trust_weight=0.8
# Hamkelasi: educational, trust_weight=0.7
# Factnameh: fact_check, trust_weight=0.9
Text Normalization (loom.contrib.rag.tools)¶
RTL Normalizer — Persian/Arabic text normalization:
from loom.contrib.rag.tools.rtl_normalizer import normalize
result = normalize(text)
result.text # normalized text
result.hashtags # extracted hashtags
result.mentions # extracted mentions
result.links # extracted URLs
result.has_emoji # bool
Features: ZWNJ preservation, Arabic-to-Persian character substitution (ي→ی, ك→ک), digit normalization (٠-٩→0-9, ۰-۹→0-9), emoji handling, Telegram footer stripping.
Temporal Batcher — Time-window utilities:
from loom.contrib.rag.tools.temporal_batcher import (
tumbling_windows, sliding_windows, daily_windows, describe_windows,
)
# Any object with a .timestamp (datetime) attribute works
windows = tumbling_windows(items, duration=timedelta(hours=6))
windows = sliding_windows(items, duration=timedelta(hours=6), step=timedelta(hours=1))
windows = daily_windows(items, tz_offset_hours=3.5) # Iran Standard Time
Stream Multiplexer (loom.contrib.rag.mux)¶
StreamMux — Merges multiple channel streams into chronological order with window assignment:
from loom.contrib.rag.mux.stream_mux import StreamMux, merge_from_ingestors
# Manual usage
mux = StreamMux()
mux.add_stream(channel_1_posts)
mux.add_stream(channel_2_posts)
stream = mux.merge(window_config=MuxWindowConfig())
# Convenience function (from ingestors directly)
stream = merge_from_ingestors(ingestors, window_config=config)
# Access results
stream.total_entries # int
stream.channel_count # int
stream.time_span_hours # float
stream.window_ids # list[str]
stream.entries # list[MuxEntry]
stream.windows() # dict[str, list[MuxEntry]]
stream.window("w1") # list[MuxEntry] for specific window
Chunker (loom.contrib.rag.chunker)¶
SentenceChunker — Persian-aware text chunking:
from loom.contrib.rag.chunker.sentence_chunker import ChunkConfig, chunk_post, chunk_mux_entry
cfg = ChunkConfig(
target_chars=400, # soft target
max_chars=600, # hard maximum
overlap_chars=50, # overlap between chunks
min_chars=20, # minimum chunk length
)
# From NormalizedPost
chunks = chunk_post(post, config=cfg)
# From MuxEntry
chunks = chunk_mux_entry(entry, config=cfg)
Splitting strategy: paragraphs first, then sentences (handles Persian sentence-enders), then fixed-char fallback.
Vector Store (loom.contrib.rag.vectorstore)¶
DuckDBVectorStore — Embedded vector database:
from loom.contrib.rag.vectorstore.duckdb_store import DuckDBVectorStore
store = DuckDBVectorStore(
db_path="/tmp/rag.duckdb",
embedding_model="nomic-embed-text",
ollama_url="http://localhost:11434",
).initialize()
# Embed and store chunks (calls Ollama)
count = store.add_chunks(text_chunks, batch_size=64)
# Store pre-embedded chunks
count = store.add_embedded_chunks(embedded_chunks)
# Semantic search
results = store.search("earthquake reports", limit=10, channel_ids=[1006939659])
# CRUD
chunk = store.get("1:5:0")
store.delete("1:5:0")
store.delete_by_source("1:5")
# Stats
stats = store.stats()
store.close()
Uses list_cosine_similarity (not array_cosine_similarity) because DuckDB FLOAT[] is variable-length.
LanceDBVectorStore — ANN vector database (faster search for large datasets):
from loom.contrib.lancedb.store import LanceDBVectorStore
store = LanceDBVectorStore(
db_path="/tmp/rag.lance",
embedding_model="nomic-embed-text",
ollama_url="http://localhost:11434",
).initialize()
# Same API as DuckDBVectorStore — both extend VectorStore ABC
count = store.add_chunks(text_chunks, batch_size=64)
results = store.search("earthquake reports", limit=10)
store.close()
Install: uv sync --extra lancedb
VectorStore ABC — Both stores implement VectorStore from vectorstore/base.py. Use the store_class parameter on VectorStoreBackend to switch:
# In worker YAML config
backend_config:
store_class: "loom.contrib.lancedb.store.LanceDBVectorStore"
db_path: "/tmp/rag-vectors.lance"
Live Telegram Capture (loom.contrib.rag.ingestion.telegram_live)¶
TelegramLiveIngestor — Monitors Telegram channels in real-time via Telethon:
from loom.contrib.rag.ingestion.telegram_live import TelegramLiveIngestor
ingestor = TelegramLiveIngestor(
channels=["@farsna", "@IranIntl_Fa"],
api_id=12345,
api_hash="your_api_hash",
)
await ingestor.start()
# Posts are buffered; drain with ingest()
for post in ingestor.ingest():
process(post)
# Check status
print(ingestor.status())
await ingestor.stop()
Requires: uv sync --extra telegram and Telegram API credentials
(TELEGRAM_API_ID, TELEGRAM_API_HASH env vars).
Analysis Actors (loom.contrib.rag.analysis)¶
LLM-powered analysis actors for trend detection, cross-channel corroboration, anomaly detection, and data extraction.
from loom.contrib.rag.analysis.llm_analyzers import (
LLMBackend, TrendAnalyzer, CorroborationFinder,
AnomalyDetector, DataExtractor,
)
# LLM backend (auto-routes by model prefix)
backend = LLMBackend(
model="ollama:llama3.2:3b", # local via Ollama
# model="anthropic:claude-sonnet-4-6", # cloud via Anthropic
)
# Trend analysis
analyzer = TrendAnalyzer(backend)
signals = analyzer.analyze(window_entries, window_id="w1",
window_start=start, window_end=end)
# Corroboration (cross-channel)
finder = CorroborationFinder(backend)
matches = finder.analyze(window_entries, ...)
# Anomaly detection
detector = AnomalyDetector(backend)
flags = detector.analyze(window_entries, ...)
# Data extraction (statistics, names, dates, locations)
extractor = DataExtractor(backend)
data = extractor.analyze(window_entries, ...)
Loom Pipeline Integration¶
The RAG module includes Loom backend wrappers and YAML configs for running as distributed Loom workers.
Backend Wrappers (loom.contrib.rag.backends)¶
| Backend | Worker Type | Wraps |
|---|---|---|
IngestorBackend |
processor | Any Ingestor subclass (default: TelegramIngestor, configurable via ingestor_class) |
MuxBackend |
processor | StreamMux |
ChunkerBackend |
processor | SentenceChunker |
VectorStoreBackend |
processor | Any VectorStore subclass (default: DuckDBVectorStore, configurable via store_class) |
Worker Configs¶
Pre-built YAML configs in configs/workers/:
rag_ingestor.yaml # Telegram ingestion
rag_mux.yaml # Stream multiplexing
rag_chunker.yaml # Sentence chunking
rag_vectorstore.yaml # Vector store operations (DuckDB)
rag_vectorstore_lance.yaml # Vector store operations (LanceDB)
rag_trend_analyzer.yaml # LLM trend analysis
Running with Loom Infrastructure¶
# Start NATS
docker run -p 4222:4222 nats:latest
# Start router
loom router --nats-url nats://localhost:4222
# Start workers
loom processor --config configs/workers/rag_ingestor.yaml --nats-url nats://localhost:4222
loom processor --config configs/workers/rag_chunker.yaml --nats-url nats://localhost:4222
loom processor --config configs/workers/rag_vectorstore.yaml --nats-url nats://localhost:4222
# Start pipeline
loom pipeline --config configs/orchestrators/rag_pipeline.yaml --nats-url nats://localhost:4222
# Submit work
loom submit "Ingest telegram export" \
--context source_path=/data/exports/result-1.json \
--nats-url nats://localhost:4222
Pipeline Config¶
configs/orchestrators/rag_pipeline.yaml defines a 3-stage pipeline:
Each stage maps outputs to the next stage's inputs via input_mapping.
Adding a New Source (non-Telegram)¶
To support a new data source, extend the Ingestor ABC:
from loom.contrib.rag.ingestion.base import Ingestor
from loom.contrib.rag.schemas.post import NormalizedPost, Language
class MyIngestor(Ingestor):
def __init__(self, source_path: str):
self._path = source_path
self._posts: list[NormalizedPost] = []
def load(self) -> "MyIngestor":
# Parse your data format
for item in self._parse_source():
self._posts.append(NormalizedPost(
global_id=f"{self.channel_id}:{item['id']}",
source_channel_id=self.channel_id,
source_channel_name=self.channel_name,
message_id=item["id"],
timestamp=item["datetime"], # timezone-aware datetime
text_clean=item["text"],
language=Language.PERSIAN,
))
return self
def ingest(self):
yield from self._posts
@property
def channel_id(self) -> int: ...
@property
def channel_name(self) -> str: ...
The Ingestor ABC requires:
load()— parse source, returnselfingest()— yieldNormalizedPostobjectsingest_all()is provided by the base class
Use with IngestorBackend by setting ingestor_class in the worker config:
Testing¶
# Run all RAG tests
uv run pytest tests/contrib/rag/ -v
# Run LanceDB tests (requires lancedb installed)
uv run pytest tests/contrib/lancedb/ -v
# Run RAG Workshop tests
uv run pytest tests/test_workshop_rag.py -v
# Run specific module tests
uv run pytest tests/contrib/rag/test_schemas.py -v
uv run pytest tests/contrib/rag/test_ingestion.py -v
uv run pytest tests/contrib/rag/test_abstractions.py -v
uv run pytest tests/contrib/rag/test_telegram_live.py -v
uv run pytest tests/contrib/rag/test_mux.py -v
uv run pytest tests/contrib/rag/test_chunker.py -v
uv run pytest tests/contrib/rag/test_tools.py -v
uv run pytest tests/contrib/rag/test_backends.py -v
All tests run without infrastructure (no NATS, no Ollama, no DuckDB server). DuckDB runs in-memory for tests. LanceDB tests are skipped if lancedb is not installed.
Demo¶
# Run the full pipeline with real Telegram data (no embeddings)
python examples/rag_demo.py
# With embeddings (requires Ollama running)
python examples/rag_demo.py --embed
File Layout¶
src/loom/contrib/rag/
__init__.py
backends.py # Loom SyncProcessingBackend wrappers (configurable classes)
schemas/
__init__.py # Re-exports all public schemas
post.py # NormalizedPost, Language, ChannelBias
telegram.py # RawTelegramMessage, TelegramChannel
mux.py # MuxWindowConfig, MuxEntry, MuxedStream
chunk.py # TextChunk, ChunkStrategy
embedding.py # EmbeddedChunk, SimilarityResult
analysis.py # TrendSignal, AnomalyFlag, ExtractedData, etc.
ingestion/
__init__.py
base.py # Ingestor ABC
telegram_ingestor.py # TelegramIngestor(Ingestor) + DEFAULT_PROFILES
telegram_live.py # TelegramLiveIngestor(Ingestor) — Telethon live capture
normalize.py # Shared normalization utilities
mux/
__init__.py
stream_mux.py # StreamMux, merge_from_ingestors
chunker/
__init__.py
sentence_chunker.py # ChunkConfig, chunk_post, chunk_mux_entry
vectorstore/
__init__.py
base.py # VectorStore ABC
duckdb_store.py # DuckDBVectorStore(VectorStore) — exact cosine similarity
analysis/
__init__.py
llm_analyzers.py # LLMBackend, TrendAnalyzer, etc.
tools/
__init__.py
rtl_normalizer.py # normalize(), Persian/RTL text processing
temporal_batcher.py # tumbling_windows, sliding_windows, daily_windows
src/loom/contrib/lancedb/
__init__.py
store.py # LanceDBVectorStore(VectorStore) — ANN search
tool.py # LanceDBVectorTool — LLM function-calling tool