podium-call-transcript-pipeline
Durable, idempotent ingest pipeline for Podium call transcripts — the layer between
Allowed Tools
Provided by Plugin
podium-pack
Claude Code skill pack for Podium (10 production-engineer skills)
Installation
This skill is included in the podium-pack plugin:
/plugin install podium-pack@claude-code-plugins-plus
Click to copy
Instructions
Podium Call Transcript Pipeline
Overview
Ingest Podium phone-call transcripts and stage them on a downstream queue so an LLM (with RAG context) can assist the team answering the phone. This is not a real-time transcription tool — Podium emits transcripts on a webhook minutes-to-hours after the call ends, and the design assumes that asynchrony. The skill is the durable layer between Podium and the RAG bridge: webhook lands, transcript is verified and de-duplicated, PII is redacted on ingest, speaker structure is preserved, language is detected, and a chunked record is enqueued for the next stage.
The six production failures this skill prevents:
- Assuming transcripts arrive in real-time — they don't. Transcripts land on the webhook minutes to hours after the call ends. Pipelines designed around the call-ended event blocking until transcript availability either time out or hold an HTTP request open for hours. The ingest must be webhook-driven and ack-decoupled.
- Partial-transcript update events overwrite the final transcript — Podium can emit
call.transcript.partialbeforecall.transcript.completed. Naive handlers store the partial as final, and the LLM downstream sees a truncated transcript. The ingest must key on(transcriptid, eventtype)and only promote a record to "final" on acompletedevent. - No language detection on ingest — non-English transcripts sent to an English-only LLM produce nonsense answers that the on-phone agent reads to the customer. Detection on ingest routes non-English transcripts to a separate handling path before they reach the RAG layer.
- PII leakage to downstream consumers — call transcripts contain credit card numbers, full phone numbers, addresses, and dates of birth. Once these reach a third-party LLM or RAG vector store they are effectively un-redactable. Redaction must happen on ingest, before the queue write, with an auditable per-redaction log.
- Queueing failures lose transcripts permanently — the webhook handler returns 200 to Podium but the downstream queue write fails. The transcript is gone with no replay path. The ingest must persist the raw transcript to a local durable store before acking the webhook; the queue write happens from that durable store with retries.
- Missing speaker diarization fields — Podium's transcript JSON tags each segment with a speaker role (caller vs agent). Flat ingest that concatenates segments destroys the structure the LLM needs. The chunker must be speaker-aware and never split a segment across speakers.
Authentication
This skill does not authenticate to Podium directly. Two distinct auth paths are involved and both are consumed by reference from sibling skills — never re-implemented:
- Inbound webhook auth — HMAC signature verification is delegated to
podium-webhook-reliability::verify_webhook(raw, signature). The webhook secret lives in the verifier's config. The handler fails closed if the verifier is not importable. - Outbound Podium API auth — the fallback poller acquires an OAuth bearer token via
podium-auth::PodiumAuth.get_token(). Credentials live inpodium-auth's secret store.
The pipeline inherits the auth posture of both skills. Operator checklist when installing:
- Verify that
podium-authandpodium-webhook-reliabilityare both installed and configured. - Configure
.gitignoreto exclude the inbox database (*.db) and the redaction audit log (redactions.jsonl). - Run a regex grep across the host repo for Podium client-secret formats and Stripe-style live keys (the canonical patterns are listed in
references/implementation.md) to confirm no inline credentials leaked. - Set
PODIUMTRANSCRIPTINBOX_PATHto a writable path with mode 0600 ownership. - Configure the downstream queue backend (Redis Streams, SQS, or SQLite-as-queue) before enabling webhook traffic.
Prerequisites
- Python 3.10+
podium-authskill installed (consumed for outbound API auth)podium-webhook-reliabilityskill installed (consumed for HMAC verification)podium-rate-limit-survivalskill installed (consumed by the fallback poller)- A durable inbox store — SQLite default; Postgres or DynamoDB are drop-in replacements
- A downstream queue — Redis Streams (default), AWS SQS, or local SQLite-as-queue for dev
langdetect(default) orfasttext-langdetectfor language detectionpresidio-analyzer+presidio-anonymizerfor high-recall PII, or the bundled regex layer alone
Instructions
Build in this order. Each section neutralizes one production failure mode.
1. Webhook-driven, ack-decoupled ingest
The webhook handler returns 200 fast and does all transcript work asynchronously. The handler's only synchronous job is verify-and-store-raw; everything else happens out-of-band in the processor.
import json, time
from fastapi import FastAPI, Request, HTTPException
from podium_webhook_reliability import verify_webhook # consumed by reference
from podium_call_transcript_pipeline import inbox
app = FastAPI()
@app.post("/podium/transcripts")
async def transcript_webhook(request: Request):
raw = await request.body()
sig = request.headers.get("podium-signature", "")
if not verify_webhook(raw, sig):
raise HTTPException(401, "invalid signature")
event = json.loads(raw)
if not event.get("type", "").startswith("call.transcript."):
return {"status": "ignored"}
# Durable write happens BEFORE returning 200. If this fails, return 5xx so Podium retries.
inbox.insert(
transcript_id=event["data"]["transcript_id"],
event_type=event["type"],
received_at=time.time(),
raw_payload=raw,
)
return {"status": "accepted"}
2. Partial-vs-completed de-duplication
Podium emits these event types on a single call:
| Event type | Meaning | Handling |
|---|---|---|
call.ended |
Audio capture complete | Note arrival; no transcript yet |
call.transcript.partial |
Best-effort transcript while final generates | Store as partial; never promote to final |
call.transcript.completed |
Final transcript ready | Promote to final; supersedes any partial |
call.transcript.failed |
Transcription failed | Record failure; alert if call duration was material |
The inbox table is keyed on (transcriptid, eventtype). A separate transcripts table is keyed on transcriptid alone. A completed event always supersedes a partial for the same transcriptid. A late-arriving partial after completed is ignored — the processor checks current status before writing.
3. Language detection on ingest
Detect language before redaction (redaction patterns are language-aware downstream). The default policy: English transcripts proceed to the standard RAG queue; non-English transcripts go to a separate queue with a translation step inserted.
from langdetect import detect_langs, DetectorFactory
DetectorFactory.seed = 0 # deterministic detection across runs
def detect_transcript_language(text: str) -> tuple[str, float]:
if len(text.strip()) < 20:
return ("und", 0.0) # too short to detect reliably
try:
top = detect_langs(text)[0]
return (top.lang, top.prob)
except Exception:
return ("und", 0.0)
Routing rule: English with confidence ≥ 0.85 → queue:rag.transcripts.en; confidence < 0.50 → queue:rag.transcripts.review (human review); otherwise → per-language queue.
4. PII redaction on ingest
Redaction is non-optional and happens before the transcript is written to the outbound queue. The redaction is auditable — for every redaction the system records category, character offsets, and the rule's id. Conservative regex layer for high-precision categories; presidio/spaCy for lower-precision recall categories (names, addresses).
import re
from dataclasses import dataclass
@dataclass
class Redaction:
category: str
rule_id: str
start: int
end: int
PATTERNS = [
("CREDIT_CARD", "card_luhn_16", re.compile(r"\b(?:\d[ -]?){13,19}\b")),
("PHONE", "phone_intl", re.compile(r"\+?\d{1,3}[ -]?\(?\d{2,4}\)?[ -]?\d{3,4}[ -]?\d{3,4}")),
("EMAIL", "email_basic", re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}")),
("SSN_US", "ssn_us", re.compile(r"\b\d{3}-\d{2}-\d{4}\b")),
]
def luhn_valid(s: str) -> bool:
digits = [int(c) for c in s if c.isdigit()]
if not 13 <= len(digits) <= 19: return False
checksum = 0
for pos, d in enumerate(reversed(digits)):
if pos % 2 == 1:
d *= 2
if d > 9: d -= 9
checksum += d
return checksum % 10 == 0
Wire presidio after the regex pass and union both result lists into one audit log keyed by transcript_id. Never ship a redaction module that silently swallows detections — every detection must either redact-and-log or pass-through-and-log with a documented reason.
5. Durable inbox + queue write with replay
The webhook writes to a durable store (SQLite default) before acking. A separate processor moves records from the inbox to the outbound queue. Failed queue writes stay in the inbox with attemptcount and nextattempt_at — the processor's next scan picks them up.
SCHEMA = """
CREATE TABLE IF NOT EXISTS inbox (
id INTEGER PRIMARY KEY AUTOINCREMENT,
transcript_id TEXT NOT NULL,
event_type TEXT NOT NULL,
received_at REAL NOT NULL,
raw_payload BLOB NOT NULL,
processed_at REAL,
enqueued_at REAL,
attempt_count INTEGER NOT NULL DEFAULT 0,
next_attempt_at REAL,
last_error TEXT,
UNIQUE(transcript_id, event_type)
);
"""
# UNIQUE(transcript_id, event_type) makes webhook redelivery idempotent.
# 3600 seconds = 1h cap on exponential backoff; 12-attempt budget = ~4 days before dead-letter.
Failed queue writes increment attemptcount, set nextattemptat = now + min(2^attempts, 3600). After 12 attempts the row moves to inboxdeadletter and pages on-call.
6. Speaker-aware chunking
Podium's transcript JSON has a segments[] array with {speakerrole, startms, end_ms, text} per utterance. Two rules:
- Never split a segment across speakers. A chunk boundary always lands at a segment boundary.
- Mark every chunk with its speaker turn-set. A chunk carries
speakers: [...].
# target_tokens=1500 fits a chunk inside a 4k context with room for RAG-retrieved
# companion chunks plus prompt scaffolding. overlap_tokens=200 preserves cross-chunk
# context without doubling the token budget.
def chunk_transcript(segments, target_tokens=1500, overlap_tokens=200):
chunks, current = [], Chunk(chunk_index=0)
for seg in segments:
seg_tokens = estimate_tokens(seg.text)
if current.token_count + seg_tokens > target_tokens and current.segments:
chunks.append(current)
overlap = build_overlap(current.segments, overlap_tokens)
current = Chunk(chunk_index=len(chunks), segments=list(overlap),
token_count=sum(estimate_tokens(s.text) for s in overlap))
current.segments.append(seg)
current.token_count += seg_tokens
if seg.speaker_role not in current.speakers:
current.speakers.append(seg.speaker_role)
if current.segments:
chunks.append(current)
return chunks
The outbound record consumed by podium-rag-context-bridge carries transcriptid, callid, locationuid, detectedlanguage, languageconfidence, redactioncount, and chunks[] with per-chunk speakers and segments.
Error Handling
| Code | Source | Root Cause | Action |
|---|---|---|---|
401 on webhook |
Handler | Signature verification failed | Reject; verifier config wrong |
5xx to Podium |
Inbox write failed | SQLite unwritable | Return 5xx so Podium retries; page on-call |
ERRTXP001 |
Reconciler | partial after completed |
Ignored by design; logged |
ERRTXP002 |
Reconciler | No transcript within N hours | Fallback poller fetches directly |
ERRTXP003 |
Language detector | Transcript < 20 chars | Route to review queue |
ERRTXP004 |
Redactor | Presidio unavailable | Regex-only mode active; warn per transcript |
ERRTXP005 |
Queue write | Redis/SQS error | Increment attempts; exponential backoff |
ERRTXP006 |
Chunker | Single segment > target_tokens | Allow oversize chunk; warn |
ERRTXP007 |
Processor | attempts > 12 | Move to dead-letter; page on-call |
Examples
Minimal end-to-end ingest
uvicorn podium_call_transcript_pipeline.webhook_ingest:app --host 0.0.0.0 --port 8080 &
python3 scripts/transcript_chunker.py --process-loop --interval 5
sqlite3 podium_transcripts.db "SELECT transcript_id, event_type, processed_at FROM inbox ORDER BY received_at DESC LIMIT 10;"
Redact a transcript via the CLI
python3 scripts/pii_redact.py --input transcript-raw.json --output transcript-redacted.json --audit-log redactions.jsonl
Fallback poller for missing webhooks
python3 scripts/transcript_poller.py --since-hours 12 --max-age-hours 4 --location-uid "{location-uid}"
Chunk with speaker-preserving overlap
python3 scripts/transcript_chunker.py --input transcript-redacted.json --output chunks.json --target-tokens 1500 --overlap-tokens 200
Output
- Webhook handler that verifies + durably stores transcript events before acking
- Inbox table with UNIQUE constraint making webhook redelivery idempotent
- Reconciler that promotes
completedoverpartialand never the inverse - Language detector with deterministic seeding and confidence thresholds
- PII redactor with auditable per-redaction log, Luhn-validated card detection
- Speaker-aware chunker (1500-token target, 200-token overlap, never splits across speakers)
- Fallback poller for missing-webhook recovery
- Outbound record shape consumed directly by
podium-rag-context-bridge
Resources
- Podium API docs — Conversations & Transcripts
- Podium Webhooks reference
- config/settings.yaml — chunk size, redaction policy, queue backend, language routing
- references/errors.md —
ERRTXP*codes with cause + solution - references/examples.md — 10 worked examples (multi-location, presidio, SQS, AU PII)
- references/implementation.md — Node port, presidio wiring, queue backend tradeoffs, dead-letter handling
- scripts/webhookingest.py — FastAPI handler (verify + inbox insert + 200)
- scripts/transcriptpoller.py — CLI: poll conversations API when webhook missed
- scripts/piiredact.py — CLI: redact a transcript JSON + audit log
- scripts/transcriptchunker.py — CLI: chunk with speaker-preserving overlap