podium-call-transcript-pipeline

Durable, idempotent ingest pipeline for Podium call transcripts — the layer between

8 Tools
podium-pack Plugin
saas packs Category

Allowed Tools

ReadWriteEditBash(curl:*)Bash(jq:*)Bash(python3:*)Bash(redis-cli:*)Grep

Provided by Plugin

podium-pack

Claude Code skill pack for Podium (10 production-engineer skills)

saas packs v2.0.0
View Plugin

Installation

This skill is included in the podium-pack plugin:

/plugin install podium-pack@claude-code-plugins-plus

Click to copy

Instructions

Podium Call Transcript Pipeline

Overview

Ingest Podium phone-call transcripts and stage them on a downstream queue so an LLM (with RAG context) can assist the team answering the phone. This is not a real-time transcription tool — Podium emits transcripts on a webhook minutes-to-hours after the call ends, and the design assumes that asynchrony. The skill is the durable layer between Podium and the RAG bridge: webhook lands, transcript is verified and de-duplicated, PII is redacted on ingest, speaker structure is preserved, language is detected, and a chunked record is enqueued for the next stage.

The six production failures this skill prevents:

  1. Assuming transcripts arrive in real-time — they don't. Transcripts land on the webhook minutes to hours after the call ends. Pipelines designed around the call-ended event blocking until transcript availability either time out or hold an HTTP request open for hours. The ingest must be webhook-driven and ack-decoupled.
  2. Partial-transcript update events overwrite the final transcript — Podium can emit call.transcript.partial before call.transcript.completed. Naive handlers store the partial as final, and the LLM downstream sees a truncated transcript. The ingest must key on (transcriptid, eventtype) and only promote a record to "final" on a completed event.
  3. No language detection on ingest — non-English transcripts sent to an English-only LLM produce nonsense answers that the on-phone agent reads to the customer. Detection on ingest routes non-English transcripts to a separate handling path before they reach the RAG layer.
  4. PII leakage to downstream consumers — call transcripts contain credit card numbers, full phone numbers, addresses, and dates of birth. Once these reach a third-party LLM or RAG vector store they are effectively un-redactable. Redaction must happen on ingest, before the queue write, with an auditable per-redaction log.
  5. Queueing failures lose transcripts permanently — the webhook handler returns 200 to Podium but the downstream queue write fails. The transcript is gone with no replay path. The ingest must persist the raw transcript to a local durable store before acking the webhook; the queue write happens from that durable store with retries.
  6. Missing speaker diarization fields — Podium's transcript JSON tags each segment with a speaker role (caller vs agent). Flat ingest that concatenates segments destroys the structure the LLM needs. The chunker must be speaker-aware and never split a segment across speakers.

Authentication

This skill does not authenticate to Podium directly. Two distinct auth paths are involved and both are consumed by reference from sibling skills — never re-implemented:

  • Inbound webhook auth — HMAC signature verification is delegated to podium-webhook-reliability::verify_webhook(raw, signature). The webhook secret lives in the verifier's config. The handler fails closed if the verifier is not importable.
  • Outbound Podium API auth — the fallback poller acquires an OAuth bearer token via podium-auth::PodiumAuth.get_token(). Credentials live in podium-auth's secret store.

The pipeline inherits the auth posture of both skills. Operator checklist when installing:

  1. Verify that podium-auth and podium-webhook-reliability are both installed and configured.
  2. Configure .gitignore to exclude the inbox database (*.db) and the redaction audit log (redactions.jsonl).
  3. Run a regex grep across the host repo for Podium client-secret formats and Stripe-style live keys (the canonical patterns are listed in references/implementation.md) to confirm no inline credentials leaked.
  4. Set PODIUMTRANSCRIPTINBOX_PATH to a writable path with mode 0600 ownership.
  5. Configure the downstream queue backend (Redis Streams, SQS, or SQLite-as-queue) before enabling webhook traffic.

Prerequisites

  • Python 3.10+
  • podium-auth skill installed (consumed for outbound API auth)
  • podium-webhook-reliability skill installed (consumed for HMAC verification)
  • podium-rate-limit-survival skill installed (consumed by the fallback poller)
  • A durable inbox store — SQLite default; Postgres or DynamoDB are drop-in replacements
  • A downstream queue — Redis Streams (default), AWS SQS, or local SQLite-as-queue for dev
  • langdetect (default) or fasttext-langdetect for language detection
  • presidio-analyzer + presidio-anonymizer for high-recall PII, or the bundled regex layer alone

Instructions

Build in this order. Each section neutralizes one production failure mode.

1. Webhook-driven, ack-decoupled ingest

The webhook handler returns 200 fast and does all transcript work asynchronously. The handler's only synchronous job is verify-and-store-raw; everything else happens out-of-band in the processor.


import json, time
from fastapi import FastAPI, Request, HTTPException
from podium_webhook_reliability import verify_webhook   # consumed by reference
from podium_call_transcript_pipeline import inbox

app = FastAPI()

@app.post("/podium/transcripts")
async def transcript_webhook(request: Request):
    raw = await request.body()
    sig = request.headers.get("podium-signature", "")
    if not verify_webhook(raw, sig):
        raise HTTPException(401, "invalid signature")
    event = json.loads(raw)
    if not event.get("type", "").startswith("call.transcript."):
        return {"status": "ignored"}

    # Durable write happens BEFORE returning 200. If this fails, return 5xx so Podium retries.
    inbox.insert(
        transcript_id=event["data"]["transcript_id"],
        event_type=event["type"],
        received_at=time.time(),
        raw_payload=raw,
    )
    return {"status": "accepted"}

2. Partial-vs-completed de-duplication

Podium emits these event types on a single call:

Event type Meaning Handling
call.ended Audio capture complete Note arrival; no transcript yet
call.transcript.partial Best-effort transcript while final generates Store as partial; never promote to final
call.transcript.completed Final transcript ready Promote to final; supersedes any partial
call.transcript.failed Transcription failed Record failure; alert if call duration was material

The inbox table is keyed on (transcriptid, eventtype). A separate transcripts table is keyed on transcriptid alone. A completed event always supersedes a partial for the same transcriptid. A late-arriving partial after completed is ignored — the processor checks current status before writing.

3. Language detection on ingest

Detect language before redaction (redaction patterns are language-aware downstream). The default policy: English transcripts proceed to the standard RAG queue; non-English transcripts go to a separate queue with a translation step inserted.


from langdetect import detect_langs, DetectorFactory
DetectorFactory.seed = 0   # deterministic detection across runs

def detect_transcript_language(text: str) -> tuple[str, float]:
    if len(text.strip()) < 20:
        return ("und", 0.0)   # too short to detect reliably
    try:
        top = detect_langs(text)[0]
        return (top.lang, top.prob)
    except Exception:
        return ("und", 0.0)

Routing rule: English with confidence ≥ 0.85 → queue:rag.transcripts.en; confidence < 0.50 → queue:rag.transcripts.review (human review); otherwise → per-language queue.

4. PII redaction on ingest

Redaction is non-optional and happens before the transcript is written to the outbound queue. The redaction is auditable — for every redaction the system records category, character offsets, and the rule's id. Conservative regex layer for high-precision categories; presidio/spaCy for lower-precision recall categories (names, addresses).


import re
from dataclasses import dataclass

@dataclass
class Redaction:
    category: str
    rule_id: str
    start: int
    end: int

PATTERNS = [
    ("CREDIT_CARD", "card_luhn_16", re.compile(r"\b(?:\d[ -]?){13,19}\b")),
    ("PHONE",       "phone_intl",   re.compile(r"\+?\d{1,3}[ -]?\(?\d{2,4}\)?[ -]?\d{3,4}[ -]?\d{3,4}")),
    ("EMAIL",       "email_basic",  re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}")),
    ("SSN_US",      "ssn_us",       re.compile(r"\b\d{3}-\d{2}-\d{4}\b")),
]

def luhn_valid(s: str) -> bool:
    digits = [int(c) for c in s if c.isdigit()]
    if not 13 <= len(digits) <= 19: return False
    checksum = 0
    for pos, d in enumerate(reversed(digits)):
        if pos % 2 == 1:
            d *= 2
            if d > 9: d -= 9
        checksum += d
    return checksum % 10 == 0

Wire presidio after the regex pass and union both result lists into one audit log keyed by transcript_id. Never ship a redaction module that silently swallows detections — every detection must either redact-and-log or pass-through-and-log with a documented reason.

5. Durable inbox + queue write with replay

The webhook writes to a durable store (SQLite default) before acking. A separate processor moves records from the inbox to the outbound queue. Failed queue writes stay in the inbox with attemptcount and nextattempt_at — the processor's next scan picks them up.


SCHEMA = """
CREATE TABLE IF NOT EXISTS inbox (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    transcript_id TEXT NOT NULL,
    event_type    TEXT NOT NULL,
    received_at   REAL NOT NULL,
    raw_payload   BLOB NOT NULL,
    processed_at  REAL,
    enqueued_at   REAL,
    attempt_count INTEGER NOT NULL DEFAULT 0,
    next_attempt_at REAL,
    last_error    TEXT,
    UNIQUE(transcript_id, event_type)
);
"""
# UNIQUE(transcript_id, event_type) makes webhook redelivery idempotent.
# 3600 seconds = 1h cap on exponential backoff; 12-attempt budget = ~4 days before dead-letter.

Failed queue writes increment attemptcount, set nextattemptat = now + min(2^attempts, 3600). After 12 attempts the row moves to inboxdeadletter and pages on-call.

6. Speaker-aware chunking

Podium's transcript JSON has a segments[] array with {speakerrole, startms, end_ms, text} per utterance. Two rules:

  1. Never split a segment across speakers. A chunk boundary always lands at a segment boundary.
  2. Mark every chunk with its speaker turn-set. A chunk carries speakers: [...].

# target_tokens=1500 fits a chunk inside a 4k context with room for RAG-retrieved
# companion chunks plus prompt scaffolding. overlap_tokens=200 preserves cross-chunk
# context without doubling the token budget.
def chunk_transcript(segments, target_tokens=1500, overlap_tokens=200):
    chunks, current = [], Chunk(chunk_index=0)
    for seg in segments:
        seg_tokens = estimate_tokens(seg.text)
        if current.token_count + seg_tokens > target_tokens and current.segments:
            chunks.append(current)
            overlap = build_overlap(current.segments, overlap_tokens)
            current = Chunk(chunk_index=len(chunks), segments=list(overlap),
                            token_count=sum(estimate_tokens(s.text) for s in overlap))
        current.segments.append(seg)
        current.token_count += seg_tokens
        if seg.speaker_role not in current.speakers:
            current.speakers.append(seg.speaker_role)
    if current.segments:
        chunks.append(current)
    return chunks

The outbound record consumed by podium-rag-context-bridge carries transcriptid, callid, locationuid, detectedlanguage, languageconfidence, redactioncount, and chunks[] with per-chunk speakers and segments.

Error Handling

Code Source Root Cause Action
401 on webhook Handler Signature verification failed Reject; verifier config wrong
5xx to Podium Inbox write failed SQLite unwritable Return 5xx so Podium retries; page on-call
ERRTXP001 Reconciler partial after completed Ignored by design; logged
ERRTXP002 Reconciler No transcript within N hours Fallback poller fetches directly
ERRTXP003 Language detector Transcript < 20 chars Route to review queue
ERRTXP004 Redactor Presidio unavailable Regex-only mode active; warn per transcript
ERRTXP005 Queue write Redis/SQS error Increment attempts; exponential backoff
ERRTXP006 Chunker Single segment > target_tokens Allow oversize chunk; warn
ERRTXP007 Processor attempts > 12 Move to dead-letter; page on-call

Examples

Minimal end-to-end ingest


uvicorn podium_call_transcript_pipeline.webhook_ingest:app --host 0.0.0.0 --port 8080 &
python3 scripts/transcript_chunker.py --process-loop --interval 5
sqlite3 podium_transcripts.db "SELECT transcript_id, event_type, processed_at FROM inbox ORDER BY received_at DESC LIMIT 10;"

Redact a transcript via the CLI


python3 scripts/pii_redact.py --input transcript-raw.json --output transcript-redacted.json --audit-log redactions.jsonl

Fallback poller for missing webhooks


python3 scripts/transcript_poller.py --since-hours 12 --max-age-hours 4 --location-uid "{location-uid}"

Chunk with speaker-preserving overlap


python3 scripts/transcript_chunker.py --input transcript-redacted.json --output chunks.json --target-tokens 1500 --overlap-tokens 200

Output

  • Webhook handler that verifies + durably stores transcript events before acking
  • Inbox table with UNIQUE constraint making webhook redelivery idempotent
  • Reconciler that promotes completed over partial and never the inverse
  • Language detector with deterministic seeding and confidence thresholds
  • PII redactor with auditable per-redaction log, Luhn-validated card detection
  • Speaker-aware chunker (1500-token target, 200-token overlap, never splits across speakers)
  • Fallback poller for missing-webhook recovery
  • Outbound record shape consumed directly by podium-rag-context-bridge

Resources

Ready to use podium-pack?