podium-conversation-history-export

Bulk-export Podium conversation, review, and contact history into a vector-store-ready

9 Tools
podium-pack Plugin
saas packs Category

Allowed Tools

ReadWriteEditBash(curl:*)Bash(jq:*)Bash(python3:*)Bash(sqlite3:*)Bash(gzip:*)Grep

Provided by Plugin

podium-pack

Claude Code skill pack for Podium (10 production-engineer skills)

saas packs v2.0.0
View Plugin

Installation

This skill is included in the podium-pack plugin:

/plugin install podium-pack@claude-code-plugins-plus

Click to copy

Instructions

Podium Conversation History Export

Overview

Bulk-export a Podium organization's historical conversations, reviews, and contacts into a corpus suitable for embedding into a vector store. This is the skill you run when the customer says "we have two years of knowledge in there" and the AI team wants every thread, every review, every contact note searchable by similarity. It is not a one-shot script — it is the full-export-plus-incremental-CDC pipeline that ingests the historical backlog once and then keeps the corpus current via nightly updated_at watermark passes.

The six production failures this skill prevents:

  1. Cursor pagination drift — Podium's next_cursor is server-side state derived from a sort key plus a position. If a conversation is created, updated, or deleted mid-export, naive cursor walks duplicate records (an updated row reappears at the new position) or skip records (a row deleted between pages shifts the cursor's anchor). A correct walk pins the sort to a stable monotonic field and dedups on id.
  2. Incremental CDC gaps via the updatedat watermark — naive updatedat > $watermark queries miss writes that happen at exactly the watermark second. Two writes within the same second on opposite sides of the boundary produce a permanent hole. Correct CDC uses >= with explicit overlap margin and dedups in the loader on (id, updated_at).
  3. Attachment URL expiry mid-download — Podium attachment URLs are pre-signed S3-style URLs that expire on the order of 15 minutes. A bulk exporter that takes an hour will get 403 SignatureDoesNotMatch on every attachment whose URL was issued in the first quarter of the run. Correct downloaders detect the 403, fetch a fresh signed URL by attachment_id, and resume.
  4. Oversized thread chunking failures — a 4000-message conversation thread (a long-running concierge thread for a high-touch RV dealer customer) blows the typical 8K-token embedding budget if naively concatenated. Chunking must be windowed with semantic boundaries (turn boundaries, day boundaries, idle gaps) and emit overlapping chunks for cross-window retrieval.
  5. PII not redacted before embedding — vector stores are effectively eternal; once a customer's SSN, credit-card number, or address is embedded it cannot be unembedded without recomputing the index. Redact at chunk-emit time with the same PII pattern set used by podium-call-transcript-pipeline, before any vector is computed.
  6. Export OOM on long threads — naively loading all messages of a 4000-message thread into memory before chunking blows the heap on the host running the export. Correct exports stream message-by-message into JSONL, then a separate pass streams JSONL into chunks. Memory cost stays O(window-size), not O(thread-size).

Prerequisites

  • Python 3.10+
  • podium-auth (this skill assumes a PodiumAuth instance is available; do not re-implement OAuth here)
  • podium-rate-limit-survival (this skill assumes the calling layer obeys per-endpoint quotas — bulk export is the most rate-limit-aggressive workload in the pack)
  • A persistent CDC watermark store — SQLite is the default; cdc_watermark.py ships with it
  • Local disk for streaming JSONL output, gzip-compressed (typical 2-year org: ~1–10 GB raw, 200 MB–2 GB gzipped)
  • An attachment-download target directory (S3 bucket, GCS bucket, or local path)
  • The PII redaction pattern set from podium-call-transcript-pipeline (reused; do not fork)

Authentication

This skill does NOT implement OAuth. All HTTP calls flow through a podiumget() injected dependency that holds a PodiumAuth instance from the podium-auth skill — that layer handles token caching, 80%-TTL refresh, single-flight locks, and scope validation. Bootstrap by Read-ing your refresh-token file, instantiate PodiumAuth(clientid, clientsecret, refreshtoken), then pass the instance to every export script via --refresh-token-file and the env-var credential flags. The five scripts in this skill never construct credentials in-process; they delegate.

If you need to harden the auth path itself (rotation, decay monitoring, multi-tenant routing), Read podium-auth/SKILL.md and stack that skill on top — this skill is the bulk-data layer, not the auth layer.

Instructions

Step 1 → Step 6 below. Build in this order. Each step neutralizes one of the six production failure modes from the Overview, in the same order.

Step 1. Cursor-paginated full crawl (neutralizes cursor drift)

Pin the sort to a stable monotonic field (created_at ascending) and dedup on id in the loader. Persist the cursor after every successful page so a crash mid-walk resumes at the last successful page boundary, not at the start.


import asyncio, json, time
from pathlib import Path
from typing import AsyncIterator

CURSOR_PATH = Path("./.cursor.conversations.json")
PAGE_SIZE = 100   # Podium documents 100 as the max; do not exceed

async def crawl_conversations(podium_get, location_uid: str) -> AsyncIterator[dict]:
    """Yield conversations in created_at-ascending order. Resumable across crashes."""
    state = json.loads(CURSOR_PATH.read_text()) if CURSOR_PATH.exists() else {}
    cursor = state.get("cursor")
    seen_ids = set(state.get("seen_ids", []))   # bounded; trim periodically

    while True:
        params = {
            "location_uid": location_uid,
            "sort": "created_at:asc",
            "limit": PAGE_SIZE,
        }
        if cursor:
            params["cursor"] = cursor

        resp = await podium_get("/v4/conversations", params=params)
        body = resp.json()

        page = body.get("data", [])
        for row in page:
            if row["id"] in seen_ids:
                continue                 # dedup against mid-walk updates
            seen_ids.add(row["id"])
            yield row

        cursor = body.get("next_cursor")
        # Persist after EVERY page — crash resume must land on the last good cursor
        CURSOR_PATH.write_text(json.dumps({
            "cursor": cursor,
            "seen_ids": list(seen_ids)[-50_000:],   # keep last 50k ids only
            "updated_at": time.time(),
        }))

        if not cursor:
            return

The seenids set is bounded at 50k to prevent unbounded memory growth on a multi-million-row export. Tune the cap to roughly 5× PAGESIZE × pagesinone_hour — large enough to dedup any reasonable update churn within the page-write window, small enough to fit in memory.

Step 2. Incremental CDC via overlap-margin watermark (neutralizes boundary gaps)

A naive updatedat > $watermark query misses any row whose updatedat is exactly the watermark second. Use >= and dedup in the loader; advance the watermark only after the page is fully persisted.


import sqlite3, time, json

WATERMARK_DB = "./watermarks.sqlite"

def get_watermark(resource: str) -> float:
    con = sqlite3.connect(WATERMARK_DB)
    cur = con.execute("SELECT watermark FROM cdc WHERE resource = ?", (resource,))
    row = cur.fetchone()
    con.close()
    return row[0] if row else 0.0

def advance_watermark(resource: str, new_watermark: float) -> None:
    con = sqlite3.connect(WATERMARK_DB)
    con.execute("""
        INSERT INTO cdc(resource, watermark, updated_at) VALUES(?, ?, ?)
        ON CONFLICT(resource) DO UPDATE SET watermark = excluded.watermark, updated_at = excluded.updated_at
    """, (resource, new_watermark, time.time()))
    con.commit()
    con.close()

async def incremental_pull(podium_get, resource: str, overlap_margin_s: int = 60):
    """Pull rows with updated_at >= (watermark - overlap_margin) and dedup."""
    watermark = get_watermark(resource)
    since = max(0, watermark - overlap_margin_s)        # explicit overlap

    cursor = None
    max_seen = watermark
    seen_keys = set()

    while True:
        params = {"updated_since": since, "sort": "updated_at:asc", "limit": 100}
        if cursor:
            params["cursor"] = cursor
        body = (await podium_get(f"/v4/{resource}", params=params)).json()

        for row in body.get("data", []):
            key = (row["id"], row["updated_at"])
            if key in seen_keys:
                continue
            seen_keys.add(key)
            yield row
            max_seen = max(max_seen, row["updated_at"])

        cursor = body.get("next_cursor")
        if not cursor:
            break

    # Advance watermark only after the full pass succeeds, not per-page —
    # a partial pass must re-pull from the previous watermark on retry.
    advance_watermark(resource, max_seen)

The overlapmargins = 60 is intentional. Podium's updated_at granularity is one second; a 60s overlap absorbs clock-skew and same-second writes without producing useful duplicate volume.

Step 3. Attachment download with refresh-on-403 (neutralizes pre-signed URL expiry)

Podium attachment URLs are pre-signed and expire ~15 minutes after they appear in the conversation payload. A multi-hour bulk export must re-fetch the signed URL by attachment ID when a download returns 403.


import asyncio, httpx
from pathlib import Path

async def download_attachment(podium_get, attachment_id: str, signed_url: str, dest: Path) -> None:
    async with httpx.AsyncClient(timeout=60) as c:
        r = await c.get(signed_url)
        if r.status_code == 403:
            # URL expired — fetch a fresh one
            fresh = (await podium_get(f"/v4/attachments/{attachment_id}")).json()
            signed_url = fresh["url"]
            r = await c.get(signed_url)
        r.raise_for_status()
        dest.write_bytes(r.content)

async def parallel_download(podium_get, attachments: list[dict], out_dir: Path, concurrency: int = 8) -> None:
    sem = asyncio.Semaphore(concurrency)
    async def _one(att):
        async with sem:
            dest = out_dir / f"{att['id']}{att.get('ext', '')}"
            await download_attachment(podium_get, att["id"], att["url"], dest)
    await asyncio.gather(*[_one(a) for a in attachments])

concurrency=8 is the safe default — Podium documents per-org rate limits, and the attachments are served from a CDN that tolerates moderate parallelism. Raise only after observing the integration's headroom against the rate-limit-survival skill's metrics.

Step 4. Windowed thread chunking with semantic boundaries (neutralizes oversized threads)

A long thread must chunk on natural boundaries — turn boundaries inside a conversation, idle gaps > 24h, and a hard cap on tokens per chunk. The chunker streams the JSONL export, never holding more than window_size messages in memory.


def chunk_thread(messages: list[dict], target_tokens: int = 1500, overlap_tokens: int = 200):
    """Yield chunks of ~target_tokens with overlap, breaking on idle gaps > 24h."""
    chunks = []
    current: list[dict] = []
    current_tokens = 0

    for i, msg in enumerate(messages):
        msg_tokens = approx_tokens(msg["body"])
        idle_gap = 0
        if i > 0:
            idle_gap = msg["created_at"] - messages[i-1]["created_at"]

        # Force-break on >24h idle OR token cap
        if current_tokens + msg_tokens > target_tokens or idle_gap > 86400:
            if current:
                chunks.append(_emit_chunk(current))
                # Overlap: carry last ~overlap_tokens of messages into next chunk
                carry = []
                carry_tokens = 0
                for m in reversed(current):
                    t = approx_tokens(m["body"])
                    if carry_tokens + t > overlap_tokens:
                        break
                    carry.insert(0, m)
                    carry_tokens += t
                current = carry
                current_tokens = carry_tokens
        current.append(msg)
        current_tokens += msg_tokens

    if current:
        chunks.append(_emit_chunk(current))
    return chunks

def approx_tokens(text: str) -> int:
    return max(1, len(text) // 4)   # OpenAI rule of thumb; replace with tiktoken in prod

The 24-hour idle-gap break is critical for support-style threads where a single thread spans months — without it, the semantic context of a chunk is incoherent (a Q1 issue and a Q4 follow-up share an embedding).

Step 5. PII redaction before embedding (neutralizes eternal-PII compliance failure)

Run the same redaction pattern set as podium-call-transcript-pipeline at chunk-emit time, before any text leaves the export process toward the embedding API. Redactions are non-recoverable — that is the point.


import re

PII_PATTERNS = [
    (re.compile(r"\b\d{3}-\d{2}-\d{4}\b"), "[REDACTED_SSN]"),
    (re.compile(r"\b(?:\d[ -]*?){13,16}\b"), "[REDACTED_CARD]"),
    (re.compile(r"\b[A-Z]{1,2}\d{6,9}\b"), "[REDACTED_LICENSE]"),
    (re.compile(r"\b\d{1,5} [\w ]{1,40}(?:Street|St|Ave|Avenue|Rd|Road|Blvd|Drive|Dr|Lane|Ln|Way|Court|Ct)\b", re.I), "[REDACTED_ADDR]"),
    (re.compile(r"[\w\.-]+@[\w\.-]+\.\w+"), "[REDACTED_EMAIL]"),
    (re.compile(r"\+?\d{1,3}[ -.]?\(?\d{3}\)?[ -.]?\d{3}[ -.]?\d{4}"), "[REDACTED_PHONE]"),
]

def redact(text: str) -> str:
    for pat, repl in PII_PATTERNS:
        text = pat.sub(repl, text)
    return text

Run redaction on the chunk body, the customer-facing display name in any quoted message, and the attachment filename if it is reproduced in the chunk. Do not redact the message id or created_at — those are non-PII and necessary for retrieval traceability.

Step 6. Streaming JSONL output (neutralizes export OOM)

Stream one record per line, gzip-compressed, with no in-memory aggregation. The chunker reads JSONL line-by-line and emits chunks; the embedder reads chunks line-by-line and emits vectors. Memory cost is O(one record) at every stage.


import gzip, json
from pathlib import Path

async def stream_export(rows_iter, out_path: Path) -> int:
    """Write rows to a gzip-compressed JSONL file, one record per line."""
    count = 0
    with gzip.open(out_path, "wt", encoding="utf-8") as f:
        async for row in rows_iter:
            f.write(json.dumps(row, separators=(",", ":")))
            f.write("\n")
            count += 1
            if count % 1000 == 0:
                f.flush()   # bound data loss on crash
    return count

separators=(",", ":") saves ~5% on disk; f.flush() every 1000 rows bounds the data-loss window on a crash to one flush interval.

Error Handling

HTTP Status Podium Error Root Cause Action
403 Forbidden SignatureDoesNotMatch (attachment) Pre-signed URL expired Re-fetch via GET /v4/attachments/:attachment_id, retry once
404 Not Found conversationnotfound Conversation deleted mid-export Skip; log the id; do not retry
409 Conflict cursor_invalid Cursor from a previous export run is no longer valid Drop cursor; restart from current watermark
429 Too Many Requests rate_limited Page rate exceeded org quota Honor Retry-After; delegate to podium-rate-limit-survival
500/502/503 server_error Podium-side transient Exponential backoff with jitter, max 4 attempts
(local) watermark_drift CDC watermark advanced past max_seen on a partial run Reset to last good watermark via cdc_watermark.py --reset

Examples

Full historical export (one-shot)


python3 scripts/export_conversations.py \
  --location-uid "{your-location-uid}" \
  --mode full \
  --out ./exports/conversations.jsonl.gz \
  --attachments-dir ./exports/attachments

Incremental nightly sync


# Run nightly via cron. Watermark advances after success; partial runs re-pull.
python3 scripts/export_conversations.py \
  --location-uid "{your-location-uid}" \
  --mode incremental \
  --watermark-db ./watermarks.sqlite \
  --overlap-margin-seconds 60 \
  --out ./exports/conversations.$(date +%F).jsonl.gz

Inspect / reset / advance the CDC watermark


# Inspect
python3 scripts/cdc_watermark.py --resource conversations --db ./watermarks.sqlite

# Reset (re-pull everything from epoch)
python3 scripts/cdc_watermark.py --resource conversations --db ./watermarks.sqlite --reset

# Advance manually (e.g., after a manual backfill via another tool)
python3 scripts/cdc_watermark.py --resource conversations --db ./watermarks.sqlite --advance 1715212800

Chunk an export for embedding


python3 scripts/chunk_for_embedding.py \
  --input ./exports/conversations.jsonl.gz \
  --output ./exports/chunks.jsonl.gz \
  --target-tokens 1500 \
  --overlap-tokens 200 \
  --redact-pii

Output is JSONL where each line is a chunk: {chunkid, sourceid, sourcetype, createdatwindow, body, tokenestimate, pii_redacted: true}. Pipe directly into your vector-store loader.

Parallel attachment download with refresh-on-403


python3 scripts/attachment_downloader.py \
  --input ./exports/conversations.jsonl.gz \
  --out-dir ./exports/attachments \
  --concurrency 8 \
  --refresh-on-403

Output

  • Streaming gzip-compressed JSONL export of conversations, reviews, contacts
  • CDC watermark SQLite store with cdc_watermark.py inspector
  • Attachment directory with all attachments downloaded, retried-on-403
  • Chunked JSONL with PII redacted, ready for vector-store ingestion
  • Cursor checkpoint files (resumable across crashes)

Resources

Ready to use podium-pack?