podium-rag-context-bridge
Bridge a live Podium call transcript or webchat turn to an LLM by fetching relevant
Allowed Tools
Provided by Plugin
podium-pack
Claude Code skill pack for Podium (10 production-engineer skills)
Installation
This skill is included in the podium-pack plugin:
/plugin install podium-pack@claude-code-plugins-plus
Click to copy
Instructions
Podium RAG Context Bridge
Overview
Take a live transcript chunk (or webchat turn) and emit a structured RAG context bundle the calling LLM can drop into its prompt. This is not a chatbot. This is the substrate that sits between Podium's transcription stream and whatever LLM your agent loop is using — fetching the right historical context, fast enough to matter, in a shape the model can actually consume.
The substrate combines two retrieval surfaces: a vector store of past Podium conversations (embedded at ingest by podium-conversation-history-export) and a live Podium API lookup for the contact's fresh state (phone, opt-out, location, last-seen). The two surfaces answer different questions — vectors answer "what did this person ever say about this topic," the live API answers "is the phone number we're about to dial actually still their phone number." A naive RAG pipeline collapses them and ships drift to the model.
The six production failures this skill prevents:
- Relevance scoring picks wrong historical context — naive cosine similarity over top-K=5 chunks surfaces the five most similar embeddings; on real Podium corpora most of those are off-topic boilerplate ("thanks for reaching out", "have a great day"). The model sees noise, generates a generic reply, and the operator loses the customer. Fix: cross-encoder reranking + per-contact filtering BEFORE the model sees anything.
- Vector store stale vs live Podium data drift — the contact updated their phone yesterday; today the vector store still embeds the old number. The model answers "I'll call you back at (555) 0100" using a number that hasn't worked in 16 hours. Vector recall and live state are different SLAs and must be merged with live state winning on any field that can mutate.
- Transcript chunk boundaries lose context — the transcriber emits chunks every 800ms. A boundary that cuts "my order number is" / "ABC-12345" in half means neither chunk retrieves the order. Both retrieve nothing relevant. Fix: sliding-window overlap (default 200ms tail of previous chunk prepended to the embed query) plus chunk coalescing before embedding.
- LLM token budget overflow — retrieved context plus system prompt plus transcript history pushes the user prompt past the model's window. Most models silently truncate from the middle; some refuse the request. Either way the model is operating on a corrupted prompt. Fix: hard token budget per retrieval surface (default 1500 tokens of context, summarized if over).
- PII reaches LLM in raw form — the retrieved historical context still contains credit-card-like strings, full home addresses, and DOBs that were never redacted at ingest because nobody told the ingest pipeline they had to. The LLM will repeat them back. Fix: redaction filter at retrieval time as the last line of defense before emission, even if ingest is supposed to do it too.
- Context emission latency exceeds call duration — by the time the bridge returns context, the customer has already hung up. The vector store p99 is 600ms, the reranker p99 is 400ms, the Podium lookup p99 is 300ms — serially that is over a second per turn and the agent never catches up. Fix: parallel fan-out + a hard 800ms wall-clock timeout that returns whatever finished, with a structured
partial: trueflag so the LLM knows it is grounding on incomplete context.
Prerequisites
- Python 3.10+
- A populated vector store of past Podium conversations. Recommended bootstrap: run
podium-conversation-history-exportagainst the org's full history, embed each chunk, and write to a pgvector table (schema inreferences/implementation.md) - A working
podium-authinstance for the live Podium contact lookup - An embedding model available at request time. Default:
BAAI/bge-large-en-v1.5viasentence-transformers(free, local) ortext-embedding-3-smallvia OpenAI (hosted, ~$0.00002/embed) - A cross-encoder reranker for the second-stage score. Default:
BAAI/bge-reranker-base(free, local). LLM-as-reranker is also supported but adds latency and cost - pgvector ≥ 0.5 if using the default backend. Pinecone / Weaviate adapters are documented but not the reference path
Instructions
Build in this order. Each section neutralizes one production failure mode.
1. Sliding-window chunk coalescing (neutralizes boundary loss)
The first thing that goes wrong is upstream of every other thing: the transcript chunks themselves arrive truncated on a word boundary that matters. Before the chunk hits the embedding model, prepend the tail of the previous chunk and the head of the next chunk (if available) so the embed query sees a full clause, not a sentence fragment.
from dataclasses import dataclass, field
from collections import deque
from typing import Deque
@dataclass
class TranscriptCoalescer:
"""Coalesce 800ms transcript chunks into overlap-window embed queries."""
tail_window_ms: int = 200
head_window_ms: int = 200
_recent: Deque[str] = field(default_factory=lambda: deque(maxlen=3))
def feed(self, chunk: str) -> str:
# tail of the previous chunk, then current chunk
prev_tail = self._tail(self._recent[-1]) if self._recent else ""
self._recent.append(chunk)
return f"{prev_tail} {chunk}".strip()
def _tail(self, s: str) -> str:
# naive: last ~30 chars; production: last word-bounded ~200ms of text
return s[-30:] if len(s) > 30 else s
Without this, every embed query is doing word-boundary keyhole surgery on the corpus and missing relevant context for reasons that have nothing to do with the model.
2. Vector query with cross-encoder reranking (neutralizes relevance noise)
Top-K cosine similarity gives you the five most-similar chunks. Most are off-topic. The cure is a second pass through a cross-encoder that scores (query, candidate) pairs directly — slower but ~10x more accurate at the top of the list. Pull top-20 from the vector store, rerank to top-5, return.
import asyncio
from typing import Protocol
class VectorStore(Protocol):
async def query(self, embedding: list[float], top_k: int,
filter: dict | None = None) -> list[dict]: ...
class Reranker(Protocol):
async def score(self, query: str, candidates: list[str]) -> list[float]: ...
class PgvectorStore:
"""pgvector reference implementation. Replace with Pinecone/Weaviate as needed."""
def __init__(self, dsn: str):
import psycopg
self.dsn = dsn # e.g. "postgresql://user:pass@host/db" — load from secret store
async def query(self, embedding: list[float], top_k: int,
filter: dict | None = None) -> list[dict]:
import psycopg
contact_uid = (filter or {}).get("contact_uid")
sql = """
SELECT id, contact_uid, content, channel, occurred_at,
1 - (embedding <=> %s::vector) AS cosine_score
FROM podium_conversations
WHERE (%s IS NULL OR contact_uid = %s)
ORDER BY embedding <=> %s::vector
LIMIT %s
"""
async with await psycopg.AsyncConnection.connect(self.dsn) as conn:
cur = await conn.execute(sql, (embedding, contact_uid, contact_uid, embedding, top_k))
rows = await cur.fetchall()
return [
{"id": r[0], "contact_uid": r[1], "content": r[2],
"channel": r[3], "occurred_at": r[4], "score": float(r[5])}
for r in rows
]
async def search_with_rerank(
query_text: str,
embedder, vector_store: VectorStore, reranker: Reranker,
contact_uid: str | None = None,
pool_k: int = 20, final_k: int = 5,
) -> list[dict]:
"""Two-stage retrieval. ANN recall to pool_k, cross-encoder rerank to final_k."""
embedding = await embedder.embed(query_text)
pool = await vector_store.query(embedding, top_k=pool_k,
filter={"contact_uid": contact_uid} if contact_uid else None)
if not pool:
return []
scores = await reranker.score(query_text, [c["content"] for c in pool])
for c, s in zip(pool, scores):
c["rerank_score"] = float(s)
pool.sort(key=lambda c: c["rerank_score"], reverse=True)
return pool[:final_k]
The contact_uid filter is load-bearing — for any transcript chunk where you already know who is on the line, pre-filtering to that contact's history is both faster (smaller search space) and more relevant (no cross-contact pollution).
3. Live Podium fetch + merge with vector results (neutralizes staleness drift)
The vector store is a snapshot. The contact's phone, opt-out flag, and last-seen-at are live state that can mutate any time. Whenever the RAG bundle includes "the contact's phone number," that field MUST come from the live API, not from a year-old embedded message.
import time
import httpx
from podium_auth import PodiumAuth
LIVE_FIELDS = {"phone", "email", "opt_out_sms", "opt_out_email", "location_uid", "last_seen_at"}
async def fetch_live_contact(auth: PodiumAuth, contact_uid: str) -> dict:
token = await auth.get_token()
async with httpx.AsyncClient(timeout=2.0) as c:
r = await c.get(
f"https://api.podium.com/v4/contacts/{contact_uid}",
headers={"Authorization": f"Bearer {token}"},
)
if r.status_code != 200:
return {"error": r.status_code, "live_fields_available": False}
body = r.json()
return {k: body.get(k) for k in LIVE_FIELDS} | {"live_fields_available": True}
def merge_live_over_vector(vector_hits: list[dict], live_contact: dict) -> dict:
"""Live state wins on any field listed in LIVE_FIELDS. Vector hits keep only content."""
return {
"contact": live_contact,
"historical_excerpts": [
{"content": h["content"], "channel": h["channel"],
"occurred_at": h["occurred_at"], "rerank_score": h["rerank_score"]}
for h in vector_hits
],
}
The merge rule is asymmetric on purpose — vector results NEVER override a live field, even if the rerank score is high. A high-scoring 2023 chunk that mentions "(555) 0100" loses to a 2026 live API response of "(555) 0199" every time.
4. PII redaction filter (neutralizes PII reaching the LLM)
Ingest is supposed to redact. Ingest forgot. The retrieval-time filter is the last line of defense and must NOT trust the corpus.
import re
REDACTION_PATTERNS = [
# credit-card-like 13-19 digit strings (Luhn-validated would be stronger)
(re.compile(r"\b\d{13,19}\b"), "[REDACTED:CC]"),
# SSN-like
(re.compile(r"\b\d{3}-?\d{2}-?\d{4}\b"), "[REDACTED:SSN]"),
# full street addresses (loose — opt-in per-deployment)
(re.compile(r"\b\d{1,5}\s+\w+\s+(St|Ave|Rd|Blvd|Lane|Ln|Dr|Drive|Court|Ct)\b", re.I),
"[REDACTED:ADDR]"),
# email
(re.compile(r"\b[\w.+-]+@[\w-]+\.[\w.-]+\b"), "[REDACTED:EMAIL]"),
# E.164-ish phone
(re.compile(r"\+?\d[\d\s\-().]{8,}\d"), "[REDACTED:PHONE]"),
# DOB-ish dates
(re.compile(r"\b(0?[1-9]|1[0-2])[/-](0?[1-9]|[12]\d|3[01])[/-](19|20)\d{2}\b"),
"[REDACTED:DOB]"),
]
def redact(text: str) -> str:
for pat, repl in REDACTION_PATTERNS:
text = pat.sub(repl, text)
return text
def redact_bundle(bundle: dict) -> dict:
bundle["historical_excerpts"] = [
{**e, "content": redact(e["content"])} for e in bundle["historical_excerpts"]
]
return bundle
Apply redactbundle AFTER mergeliveovervector and BEFORE the bundle ever reaches the LLM emission path. The live-contact fields are not redacted — they are the inputs the LLM needs to compose the answer (the bridge ships them unredacted to the model and the model is responsible for not echoing the customer's own DOB back at them).
5. Token-budget enforcement (neutralizes context overflow)
The LLM has a window. The bundle has whatever shape retrieval emitted. The two must reconcile before the prompt is built or the model silently truncates from the middle.
def approx_tokens(s: str) -> int:
# tiktoken-style ~4 chars / token; replace with model-specific tokenizer in prod
return max(1, len(s) // 4)
def enforce_budget(bundle: dict, max_tokens: int = 1500) -> dict:
"""Drop lowest-rerank excerpts until under budget. Summarize if dropping reaches 1."""
excerpts = bundle["historical_excerpts"]
excerpts.sort(key=lambda e: e["rerank_score"], reverse=True)
total = sum(approx_tokens(e["content"]) for e in excerpts)
while total > max_tokens and len(excerpts) > 1:
dropped = excerpts.pop()
total -= approx_tokens(dropped["content"])
if total > max_tokens and excerpts:
# last-resort hard truncate of the single remaining excerpt
e = excerpts[0]
e["content"] = e["content"][: max_tokens * 4]
e["truncated"] = True
bundle["historical_excerpts"] = excerpts
bundle["token_count"] = total
return bundle
max_tokens is the budget for the historical-excerpts surface only — the live contact block, system prompt, and live transcript history have their own allocations in the calling agent loop. Sizing them all together is the calling agent's job; this skill is responsible for keeping retrieval inside its own slice.
6. Hard timeout + parallel fan-out (neutralizes missed-the-call latency)
The bridge MUST return inside the latency budget. If reranking is slow today, the bundle ships without reranked excerpts and the LLM grounds on raw vector hits — degraded but timely. The flag partial: true tells the model it is grounding on incomplete context.
import asyncio
import time
async def build_context(
transcript_chunk: str,
contact_uid: str | None,
auth, embedder, vector_store, reranker,
timeout_ms: int = 800,
) -> dict:
"""Fan out retrieval surfaces in parallel; merge what completes before timeout."""
started = time.monotonic()
deadline = started + timeout_ms / 1000.0
async def with_deadline(coro):
try:
return await asyncio.wait_for(coro, timeout=max(0.01, deadline - time.monotonic()))
except asyncio.TimeoutError:
return None
coalesced = transcript_chunk # production: pass through TranscriptCoalescer
vec_task = asyncio.create_task(
with_deadline(search_with_rerank(coalesced, embedder, vector_store, reranker,
contact_uid=contact_uid))
)
live_task = asyncio.create_task(
with_deadline(fetch_live_contact(auth, contact_uid)) if contact_uid else asyncio.sleep(0)
)
vec_hits = (await vec_task) or []
live = (await live_task) or {"live_fields_available": False}
elapsed_ms = int((time.monotonic() - started) * 1000)
bundle = merge_live_over_vector(vec_hits, live)
bundle = redact_bundle(bundle)
bundle = enforce_budget(bundle)
bundle["meta"] = {
"elapsed_ms": elapsed_ms,
"timeout_ms": timeout_ms,
"partial": elapsed_ms >= timeout_ms,
"had_vector_hits": bool(vec_hits),
"had_live_lookup": live.get("live_fields_available", False),
}
return bundle
The deadline is the contract. A bridge that misses the deadline is broken even if its output is perfect. Measure p99 against timeout_ms and treat sustained partial: true rates above 5% as a paging incident.
Error Handling
| Surface | Failure | Bundle behavior |
|---|---|---|
| Vector store unreachable | vec_task returns None |
historicalexcerpts: [], meta.hadvector_hits: false |
| Embedder model timeout | embed step exceeds budget | Skip vector surface entirely; live-only bundle |
| Reranker timeout | rerank step exceeds budget | Fall back to raw cosine score from vector store |
| Live Podium 429 | rate-limited | contact.livefieldsavailable: false; vector-only bundle |
| Live Podium 401 | auth dead | Surface to podium-auth decay monitor; vector-only bundle this call |
| Live Podium 404 | unknown contact_uid | contact: {"error": 404}; vector hits without contact filter |
| All surfaces timeout | hard 800ms hit | Empty bundle with meta.partial: true — LLM grounds on transcript alone |
Examples
Minimal: build a bundle for one transcript chunk
python3 scripts/context_fetch.py \
--transcript "I had a question about my last order" \
--contact-uid "ctc_abc123" \
--pgvector-dsn "{your-pgvector-dsn}" \
--output json
Output (truncated):
{
"contact": {"phone": "+15551230199", "opt_out_sms": false, "live_fields_available": true},
"historical_excerpts": [
{"content": "Order ABC-12345 shipped Tuesday", "channel": "webchat",
"occurred_at": "2026-05-01T14:22:00Z", "rerank_score": 0.91}
],
"token_count": 14,
"meta": {"elapsed_ms": 412, "timeout_ms": 800, "partial": false,
"had_vector_hits": true, "had_live_lookup": true}
}
Raw vector query (no live merge, no redaction)
python3 scripts/vector_query.py \
--query "did they ask about refund policy" \
--contact-uid "ctc_abc123" \
--pgvector-dsn "{your-pgvector-dsn}" \
--top-k 5
Build the LLM prompt from a bundle
python3 scripts/transcript_to_llm.py \
--transcript "I had a question about my last order" \
--bundle-file ./bundle.json \
--system-prompt-file ./system.txt \
--max-tokens 4000
Emits a structured prompt: SYSTEM block, LIVE CONTACT block, HISTORICAL CONTEXT block (with rerank scores), TRANSCRIPT TURN block. The LLM sees clearly which fields are live and which are historical.
Score a single (query, candidate) pair
python3 scripts/relevance_score.py \
--query "did they ask about refund policy" \
--candidate "Our policy is 30-day refunds with receipt"
# 0.87
Output
context_fetch.py-shaped pipeline that emits a structured JSON bundle in ≤800ms- pgvector schema + ingest hook that feeds
podium-conversation-history-exportoutput into the vector store - Reranker wiring (cross-encoder default; LLM-reranker optional adapter)
- Redaction filter applied at retrieval time as the last line of defense
- Token budget enforced per surface with rerank-priority drop ordering
- Hard deadline +
partial: trueflag so the LLM never silently grounds on incomplete context - p99 latency dashboard with vector / rerank / live-fetch broken out — sustained
partial>5% is a page
Resources
- Podium API docs — Contacts
- pgvector documentation
- BGE reranker — BAAI/bge-reranker-base
- config/settings.yaml — poolk / finalk, timeout budgets, redaction patterns, embedder + reranker defaults
- references/errors.md — ERRRAG* codes with cause + solution
- references/examples.md — 10 worked examples (single-tenant, agent-loop, sandbox eval, fault drills)
- references/implementation.md — pgvector schema, Pinecone/Weaviate adapters, embedder + reranker swaps, ingest hook for the export skill
- scripts/contextfetch.py — CLI: transcript chunk + contact → JSON bundle
- scripts/vectorquery.py — CLI: raw two-stage retrieval with rerank
- scripts/transcripttollm.py — CLI: format a turn + bundle into an LLM prompt
- scripts/relevancescore.py — CLI: score (query, candidate) ∈ [0, 1]