hubspot-warehouse-sync
Sync HubSpot CRM data to a data warehouse (BigQuery, Snowflake, or Postgres) for analytics and reporting. Covers initial backfill of millions of records under the 500K/day rate limit, incremental CDC polling via hs_lastmodifieddate, schema-drift detection with ALTER TABLE generation, association sync for contacts/deals/companies, and idempotent upsert patterns that prevent duplicate rows on retry. Use when building a HubSpot → warehouse pipeline, resyncing after a schema change, debugging duplicate rows or missing CDC updates, or recovering from a rate-limit burnout mid-backfill. Trigger with "hubspot warehouse sync", "hubspot bigquery", "hubspot snowflake", "hubspot postgres etl", "hubspot backfill", "hubspot cdc", "hubspot schema drift", "hubspot duplicate rows", "hubspot to data warehouse".
Allowed Tools
Provided by Plugin
hubspot-pack
Claude Code skill pack for HubSpot (10 production-engineer skills)
Installation
This skill is included in the hubspot-pack plugin:
/plugin install hubspot-pack@claude-code-plugins-plus
Click to copy
Instructions
HubSpot Warehouse Sync
Overview
Move HubSpot CRM data to BigQuery, Snowflake, or Postgres in a way that survives production — not just the demo. This is not a connector walkthrough. It is the extraction and load code your pipeline runs when a 2M-contact backfill burns through the 500K daily call quota at noon, when CDC misses three days of deal updates because association changes do not update hs_lastmodifieddate, when a portal admin adds a custom property and your warehouse table schema silently drifts, and when a network timeout at record 45,000 causes your retry to insert 100 duplicate rows.
The six production failures this skill prevents:
- Backfill exhausting the daily rate limit before completing — 2M contacts at 100 records/call is 20,000 calls. At 100 calls/10s the math says 33 minutes, but that burns your entire 500K daily quota before noon and takes every other integration down with it. Token bucket rate limiting with a configurable daily ceiling is non-optional.
- CDC missing association changes — HubSpot's
hs_lastmodifieddateis updated when any property on the contact record changes, but not when an association is created or deleted. A contact-to-deal link added by a sales rep is invisible to a lastmodifieddate-based incremental poll. Association CDC requires a separate poll strategy. - Schema drift causing silent extraction failures — when a portal admin adds or removes a custom property, the warehouse table schema and the extraction property list become misaligned. New properties are dropped on the floor. Removed properties cause KeyError on row construction. Neither failure raises an alarm without explicit schema validation.
- Duplicate rows on batch retry — a network failure mid-batch causes the batch to be re-sent. Without a proper upsert key the warehouse gets duplicate rows that are invisible until an analyst notices double-counted revenue. The correct upsert key for contacts is
id(HubSpot object ID), not a composite of name/email. - Large payload failures on associated object inline pulls — contacts with thousands of engagement records cause payload sizes that exceed HTTP response limits when associations are pulled inline. Associations must be fetched in a separate batch read pass.
- Timezone inconsistency in aggregations — HubSpot stores all timestamps as Unix milliseconds in UTC. If the warehouse session timezone is set to a local timezone,
DATE(created_at)aggregations produce different daily totals depending on where the analyst runs the query.
Prerequisites
- Python 3.10+
- HubSpot private app token with scopes:
crm.objects.contacts.read,crm.objects.companies.read,crm.objects.deals.read,crm.associations.read,crm.schemas.contacts.read - Target warehouse credentials:
- BigQuery: service account JSON with
bigquery.dataEditorrole - Snowflake: user/password or key-pair auth, warehouse + database + schema
- Postgres: connection string with write access to the target schema
- Python packages:
requests,pandas,pyarrow,google-cloud-bigquery(BigQuery) orsnowflake-connector-python(Snowflake) orpsycopg2-binary(Postgres) - A secret store the runtime can read (env vars, GCP Secret Manager, AWS Secrets Manager)
Instructions
Build in this order. Each section neutralizes one production failure mode.
1. Token bucket rate limiter (neutralizes backfill quota burnout)
Naive extraction loops call the API as fast as possible. At 100 calls/10s with 20K calls needed, you burn 200K of your 500K daily quota in 33 minutes — and that is before any CDC polling or webhook processing. A production backfill must budget its calls over the full day so other integrations still have headroom.
The token bucket strategy: set a daily_budget ceiling below the account limit (e.g., 400K of 500K), compute how many calls per second that allows over 24 hours, and enforce a minimum interval between calls. Burn-rate is checked against live rate-limit headers on every response.
import time
import threading
from dataclasses import dataclass, field
@dataclass
class TokenBucket:
"""
Token bucket rate limiter for HubSpot API calls.
daily_budget: max calls to spend per 24h window (stay below 500K limit)
burst_limit: max calls per 10s window (100 for most tiers)
"""
daily_budget: int = 400_000
burst_limit: int = 95 # leave 5 calls headroom per 10s window
_lock: threading.Lock = field(default_factory=threading.Lock)
_calls_today: int = 0
_window_calls: int = 0
_window_start: float = field(default_factory=time.monotonic)
_day_start: float = field(default_factory=time.monotonic)
def acquire(self) -> None:
with self._lock:
now = time.monotonic()
# Reset daily counter
if now - self._day_start >= 86_400:
self._calls_today = 0
self._day_start = now
# Reset 10s window counter
if now - self._window_start >= 10.0:
self._window_calls = 0
self._window_start = now
# Hard stop if daily budget exhausted — do not burn other integrations
if self._calls_today >= self.daily_budget:
seconds_left = 86_400 - (now - self._day_start)
raise DailyBudgetExhausted(
f"Daily call budget of {self.daily_budget:,} reached. "
f"Resuming in {seconds_left/3600:.1f}h."
)
# Burst window throttle — sleep until window resets if full
if self._window_calls >= self.burst_limit:
sleep_s = 10.0 - (now - self._window_start) + 0.1
time.sleep(max(0, sleep_s))
self._window_calls = 0
self._window_start = time.monotonic()
self._calls_today += 1
self._window_calls += 1
def update_from_headers(self, headers: dict) -> None:
"""Adjust pacing from live rate-limit headers on each response."""
remaining = int(headers.get("X-HubSpot-RateLimit-Daily-Remaining", self.daily_budget))
if remaining < 50_000:
# Emergency throttle: burn rate is too high, cut burst limit in half
with self._lock:
self.burst_limit = max(10, self.burst_limit // 2)
class DailyBudgetExhausted(Exception):
pass
RATE_LIMITER = TokenBucket()
2. Full backfill with cursor pagination (contacts search API)
The search API (POST /crm/v3/objects/contacts/search) supports cursor pagination via after. It returns a maximum of 100 records per page and up to 10,000 records total per search. For tables larger than 10K records, use the lastmodifieddate range-slicing strategy: paginate within 30-day windows, sliding forward from the earliest hs_createdate in the portal.
import requests
import json
BASE_URL = "https://api.hubapi.com"
def fetch_contacts_page(
token: str,
after: str | None,
properties: list[str],
rate_limiter: TokenBucket,
) -> tuple[list[dict], str | None]:
"""Fetch one page of contacts. Returns (records, next_cursor)."""
RATE_LIMITER.acquire()
body = {
"limit": 100,
"properties": properties,
"sorts": [{"propertyName": "hs_lastmodifieddate", "direction": "ASCENDING"}],
}
if after:
body["after"] = after
resp = requests.post(
f"{BASE_URL}/crm/v3/objects/contacts/search",
headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
json=body,
timeout=30,
)
rate_limiter.update_from_headers(dict(resp.headers))
if resp.status_code == 429:
retry_after = int(resp.headers.get("Retry-After", 10))
time.sleep(retry_after)
return fetch_contacts_page(token, after, properties, rate_limiter) # one retry
resp.raise_for_status()
data = resp.json()
results = data.get("results", [])
next_cursor = data.get("paging", {}).get("next", {}).get("after")
return results, next_cursor
def backfill_contacts(
token: str,
properties: list[str],
rate_limiter: TokenBucket,
checkpoint_file: str = "/tmp/hubspot_backfill_checkpoint.json",
) -> int:
"""
Full backfill with checkpoint. Resume-safe: reads cursor from checkpoint_file
so a mid-run failure restarts from the last completed page, not from zero.
Returns total records written.
"""
# Load checkpoint
try:
with open(checkpoint_file) as f:
checkpoint = json.load(f)
after = checkpoint.get("after")
total = checkpoint.get("total", 0)
print(f"Resuming backfill from cursor {after}, {total:,} records already written")
except FileNotFoundError:
after = None
total = 0
while True:
records, next_cursor = fetch_contacts_page(token, after, properties, rate_limiter)
if not records:
break
yield records # caller handles warehouse write
total += len(records)
after = next_cursor
# Write checkpoint after every page so a restart costs at most 100 records
with open(checkpoint_file, "w") as f:
json.dump({"after": after, "total": total}, f)
if not next_cursor:
break
print(f"Backfill complete: {total:,} contacts")
return total
3. Incremental CDC poll (neutralizes missed updates — with association gap)
The standard CDC pattern polls on hslastmodifieddate > lastrun. This catches property changes but silently misses association changes (linking a contact to a deal or removing that link does not update hs_lastmodifieddate). The fix is a two-pass incremental: a property poll and a separate association poll on a shorter interval.
import datetime
def incremental_contacts_since(
token: str,
since_ms: int,
properties: list[str],
rate_limiter: TokenBucket,
) -> list[dict]:
"""
CDC property poll: return all contacts modified after since_ms (Unix ms UTC).
Does NOT cover association changes — run poll_association_changes() separately.
"""
all_records = []
after = None
while True:
RATE_LIMITER.acquire()
body = {
"limit": 100,
"properties": properties,
"filterGroups": [{
"filters": [{
"propertyName": "hs_lastmodifieddate",
"operator": "GT",
"value": str(since_ms),
}]
}],
"sorts": [{"propertyName": "hs_lastmodifieddate", "direction": "ASCENDING"}],
}
if after:
body["after"] = after
resp = requests.post(
f"{BASE_URL}/crm/v3/objects/contacts/search",
headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
json=body,
timeout=30,
)
rate_limiter.update_from_headers(dict(resp.headers))
resp.raise_for_status()
data = resp.json()
all_records.extend(data.get("results", []))
next_cursor = data.get("paging", {}).get("next", {}).get("after")
if not next_cursor:
break
after = next_cursor
return all_records
def poll_association_changes(
token: str,
contact_ids: list[str],
rate_limiter: TokenBucket,
) -> dict[str, list[str]]:
"""
Fetch current contact → deal associations for a list of contact IDs.
Use this to detect additions and deletions that hs_lastmodifieddate misses.
Strategy: compare fetched associations against warehouse snapshot.
Differences = changes that must be written.
Returns {contact_id: [deal_id, ...]}
"""
# Batch read associations: up to 100 contacts per call (v4 associations API)
result = {}
for chunk_start in range(0, len(contact_ids), 100):
chunk = contact_ids[chunk_start:chunk_start + 100]
RATE_LIMITER.acquire()
resp = requests.post(
f"{BASE_URL}/crm/v4/associations/contacts/deals/batch/read",
headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
json={"inputs": [{"id": cid} for cid in chunk]},
timeout=30,
)
rate_limiter.update_from_headers(dict(resp.headers))
resp.raise_for_status()
for item in resp.json().get("results", []):
from_id = str(item["from"]["id"])
to_ids = [str(a["toObjectId"]) for a in item.get("to", [])]
result[from_id] = to_ids
return result
4. Schema drift detection and ALTER TABLE generation
When a portal admin adds or removes a custom property, your warehouse table schema diverges from the API response silently. The correct mitigation is a pre-run schema check: enumerate all HubSpot properties via the properties API, diff against the warehouse column list, and emit ALTER TABLE statements for any new columns. Removed properties become nullable and are not dropped (dropping columns in production is a separate review).
def fetch_hubspot_property_schema(token: str) -> dict[str, str]:
"""
Returns {property_name: warehouse_type} for all contact properties.
Maps HubSpot field types to warehouse-appropriate column types.
"""
resp = requests.get(
f"{BASE_URL}/crm/v3/properties/contacts",
headers={"Authorization": f"Bearer {token}"},
timeout=30,
)
resp.raise_for_status()
TYPE_MAP = {
"string": "TEXT",
"number": "FLOAT64",
"date": "DATE",
"datetime": "TIMESTAMP",
"bool": "BOOLEAN",
"enumeration": "TEXT",
"phone_number":"TEXT",
"json": "TEXT", # store complex types as serialized JSON
}
schema = {}
for prop in resp.json().get("results", []):
hs_type = prop.get("type", "string")
schema[prop["name"]] = TYPE_MAP.get(hs_type, "TEXT")
return schema
def detect_schema_drift(
hubspot_schema: dict[str, str],
warehouse_columns: set[str],
) -> tuple[dict[str, str], set[str]]:
"""
Returns:
added: {column_name: type} — in HubSpot but not warehouse
removed: {column_name} — in warehouse but not HubSpot
"""
hs_columns = set(hubspot_schema.keys())
added = {k: v for k, v in hubspot_schema.items() if k not in warehouse_columns}
removed = warehouse_columns - hs_columns - {"_synced_at", "_sync_run_id"} # exclude meta cols
return added, removed
def generate_alter_statements(table: str, added: dict[str, str]) -> list[str]:
"""Generate ALTER TABLE ADD COLUMN statements for new HubSpot properties."""
stmts = []
for col, dtype in added.items():
safe_col = col.replace("-", "_").lower()
stmts.append(f"ALTER TABLE {table} ADD COLUMN IF NOT EXISTS {safe_col} {dtype};")
return stmts
5. Warehouse upsert patterns (neutralizes duplicate rows on retry)
Each warehouse requires a different upsert idiom. The upsert key is always id — HubSpot's immutable object ID. Never composite on mutable fields like email or name; those can change and cause phantom duplicates.
All three patterns below use a staging table approach: load new rows into a temp table, then merge into production. This is the most portable pattern across warehouse engines.
import pandas as pd
import pyarrow as pa
def build_contacts_dataframe(records: list[dict], properties: list[str]) -> pd.DataFrame:
"""
Normalize HubSpot search API results into a flat DataFrame.
Key transformations:
- All timestamps parsed as UTC (avoids timezone-at-load inconsistency)
- Nested properties dict flattened to columns
- Missing properties filled with None (not 0 or empty string)
- _synced_at appended so warehouse always has the load timestamp
"""
rows = []
for r in records:
row = {"id": r["id"]}
props = r.get("properties", {})
for p in properties:
val = props.get(p)
# HubSpot timestamps are Unix ms UTC — parse explicitly
if val and p.endswith("date") and val.isdigit():
row[p] = pd.to_datetime(int(val), unit="ms", utc=True)
else:
row[p] = val
row["_synced_at"] = pd.Timestamp.utcnow()
rows.append(row)
return pd.DataFrame(rows)
BigQuery — MERGE upsert
from google.cloud import bigquery
def upsert_to_bigquery(
df: pd.DataFrame,
project: str,
dataset: str,
table: str,
bq_client: bigquery.Client,
) -> int:
staging_table = f"{project}.{dataset}.{table}_staging"
target_table = f"{project}.{dataset}.{table}"
# Write staging
job = bq_client.load_table_from_dataframe(df, staging_table,
job_config=bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE"))
job.result()
# MERGE into target — upsert on id
merge_sql = f"""
MERGE `{target_table}` T
USING `{staging_table}` S
ON T.id = S.id
WHEN MATCHED THEN
UPDATE SET {', '.join(f'T.{c} = S.{c}' for c in df.columns if c != 'id')}
WHEN NOT MATCHED THEN
INSERT ({', '.join(df.columns)})
VALUES ({', '.join(f'S.{c}' for c in df.columns)})
"""
bq_client.query(merge_sql).result()
return len(df)
Snowflake — MERGE upsert
import snowflake.connector
def upsert_to_snowflake(
df: pd.DataFrame,
conn: snowflake.connector.SnowflakeConnection,
schema: str,
table: str,
) -> int:
staging = f"{schema}.{table}_staging"
target = f"{schema}.{table}"
cursor = conn.cursor()
# Write staging via CSV upload + COPY INTO
cursor.execute(f"CREATE OR REPLACE TEMP TABLE {staging} LIKE {target}")
success, nchunks, nrows, _ = snowflake.connector.pandas_tools.write_pandas(
conn, df, f"{table}_staging", schema=schema, auto_create_table=False
)
set_cols = ", ".join(f"t.{c} = s.{c}" for c in df.columns if c != "id")
ins_cols = ", ".join(df.columns)
ins_vals = ", ".join(f"s.{c}" for c in df.columns)
cursor.execute(f"""
MERGE INTO {target} t
USING {staging} s ON t.id = s.id
WHEN MATCHED THEN UPDATE SET {set_cols}
WHEN NOT MATCHED THEN INSERT ({ins_cols}) VALUES ({ins_vals})
""")
return nrows
Postgres — INSERT ... ON CONFLICT upsert
import psycopg2
from psycopg2.extras import execute_values
def upsert_to_postgres(
df: pd.DataFrame,
conn: psycopg2.extensions.connection,
schema: str,
table: str,
) -> int:
cols = list(df.columns)
set_expr = ", ".join(f"{c} = EXCLUDED.{c}" for c in cols if c != "id")
sql = f"""
INSERT INTO {schema}.{table} ({', '.join(cols)})
VALUES %s
ON CONFLICT (id) DO UPDATE SET {set_expr}
"""
rows = [tuple(r) for r in df.itertuples(index=False, name=None)]
with conn.cursor() as cur:
execute_values(cur, sql, rows)
conn.commit()
return len(rows)
6. Large association payloads (separate fetch pass)
Never pull associations inline in the contacts search response. A contact with 500 engagement records produces a response payload measured in megabytes that times out or exceeds HTTP limits. Fetch associations in a second pass using the batch associations endpoint, keyed on the contact IDs from step 2/3.
The implementation is in pollassociationchanges() above. The rule is: contacts page first, IDs collected, associations fetched as a second batch read. Write associations to a separate hubspotcontactdeal_associations table — not as columns on the contacts table.
Error Handling
| HTTP Status | Error | Root Cause | Action |
|---|---|---|---|
429 TOOMANYREQUESTS |
RATE_LIMIT |
Burst or daily quota exhausted | Read Retry-After header; sleep; check daily remaining |
400 BAD_REQUEST |
INVALIDFILTERVALUE |
hs_lastmodifieddate filter value is not a valid Unix ms string |
Cast since_ms to str before inserting into filter body |
400 BAD_REQUEST |
INVALID_OFFSET |
Cursor after value is stale (>7 days for search API) |
Discard checkpoint; restart backfill from beginning of window |
400 BAD_REQUEST |
Max associations per request exceeded |
Sent more than 100 IDs to batch associations endpoint | Chunk input list to 100 before calling |
401 UNAUTHORIZED |
INVALID_AUTHENTICATION |
Token expired or revoked | Rotate or refresh token before resuming |
403 FORBIDDEN |
MISSING_SCOPES |
crm.schemas.contacts.read not granted |
Add scope in Private Apps settings |
413 PAYLOADTOOLARGE |
— | Property list too long for a single search call | Request properties in batches of 50, merge results |
500 INTERNAL_ERROR |
— | Transient HubSpot server error | Retry with exponential backoff (max 4 attempts) |
504 GATEWAY_TIMEOUT |
— | Response payload too large or HubSpot overloaded | Reduce page size to 50; add 2s delay between calls |
DailyBudgetExhausted (local) |
— | Token bucket daily ceiling hit | Pause extraction until midnight UTC; alert on-call |
Examples
Minimal end-to-end backfill to BigQuery
python3 - <<'EOF'
import os
from google.cloud import bigquery
# See implementation-guide.md for full script with retry, schema sync, and checkpoint
from hubspot_sync import backfill_contacts, upsert_to_bigquery, TokenBucket
TOKEN = os.environ["HUBSPOT_ACCESS_TOKEN"]
PROJECT = os.environ["GCP_PROJECT"]
DATASET = "hubspot_raw"
TABLE = "contacts"
bq = bigquery.Client(project=PROJECT)
limiter = TokenBucket(daily_budget=400_000)
props = ["email", "firstname", "lastname", "hs_lastmodifieddate", "lifecyclestage"]
for page in backfill_contacts(TOKEN, props, limiter):
df = build_contacts_dataframe(page, props)
upsert_to_bigquery(df, PROJECT, DATASET, TABLE, bq)
EOF
Inspect current rate-limit headroom before starting a backfill
curl -s -I "https://api.hubapi.com/crm/v3/objects/contacts?limit=1" \
-H "Authorization: Bearer $HUBSPOT_ACCESS_TOKEN" \
| grep -i "X-HubSpot-RateLimit"
Enumerate all contact properties (for schema sync)
curl -s "https://api.hubapi.com/crm/v3/properties/contacts" \
-H "Authorization: Bearer $HUBSPOT_ACCESS_TOKEN" \
| jq '[.results[] | {name: .name, type: .type, label: .label}]' \
| head -60
Run CDC since last successful run (stored in a state table)
python3 - <<'EOF'
import os, requests, datetime
TOKEN = os.environ["HUBSPOT_ACCESS_TOKEN"]
SINCE_MS = 1_700_000_000_000 # replace with value from state table
from hubspot_sync import incremental_contacts_since, TokenBucket
limiter = TokenBucket()
records = incremental_contacts_since(TOKEN, SINCE_MS, ["email", "lifecyclestage"], limiter)
print(f"CDC returned {len(records)} changed contacts")
EOF
Output
- Token bucket rate limiter configured to stay within daily budget with live header feedback
- Full backfill loop with checkpoint file for resume-safe restarts
- CDC property poll (contacts modified since last run) with filter on
hs_lastmodifieddate - Association CDC via separate batch associations poll, writing to a separate junction table
- Schema drift detection comparing HubSpot property API response against warehouse column list
- ALTER TABLE statements for new properties, no auto-drop for removed properties
pandasDataFrame builder with UTC timestamp normalization- BigQuery MERGE, Snowflake MERGE, and Postgres INSERT ... ON CONFLICT upsert patterns
- Monitoring log lines at every rate-limit threshold and schema drift event
Resources
- HubSpot CRM Search API
- HubSpot Batch Read Contacts
- HubSpot v4 Associations API
- HubSpot Properties API
- HubSpot Rate Limits Reference
- APIREFERENCE.md — search API cursor shapes, batch read request/response, CDC fields, rate-limit headers, association batch format
- implementation-guide.md — complete backfill script with token bucket, full CDC poll loop, schema drift handler, ALTER TABLE generator, all three warehouse upsert implementations