navan-data-sync
Implement incremental sync strategies for Navan BOOKING and TRANSACTION data with ETL pipeline patterns. Use when setting up production data pipelines, debugging sync drift, or adding real-time event processing. Trigger with "navan data sync", "navan incremental sync", "navan ETL pipeline".
Allowed Tools
Provided by Plugin
navan-pack
Claude Code skill pack for Navan (24 skills)
Installation
This skill is included in the navan-pack plugin:
/plugin install navan-pack@claude-code-plugins-plus
Click to copy
Instructions
Navan — Data Sync
Overview
This skill provides production-grade sync strategies for Navan data. The two primary tables have fundamentally different sync models: BOOKING requires weekly full-refresh with merge-upsert logic (every record is re-imported, keyed by UUID), while TRANSACTION is incremental and append-only. Real-time use cases require webhook callbacks for event-driven processing. This skill covers all three tiers — scheduled full-refresh, incremental watermark-based sync, and real-time webhooks — along with Airbyte connector configuration and idempotent SQL upsert patterns.
Prerequisites
- Navan account with OAuth 2.0 API credentials (see
navan-install-auth) - Destination warehouse (Snowflake, BigQuery, PostgreSQL, or Redshift)
- For managed sync: Airbyte instance (Cloud or OSS) with source-navan v0.0.42+
- For webhooks: publicly accessible HTTPS endpoint for callbacks
- Node.js 18+ or Python 3.8+
- Environment variables:
NAVANCLIENTID,NAVANCLIENTSECRET,NAVANBASEURL
Instructions
Step 1: Full-Refresh Sync for BOOKING Table
The BOOKING table is re-imported weekly by Navan. Every record is refreshed, so your sync must use merge-upsert logic to avoid duplicates while capturing updates.
const tokenRes = await fetch(`${process.env.NAVAN_BASE_URL}/authenticate`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
client_id: process.env.NAVAN_CLIENT_ID,
client_secret: process.env.NAVAN_CLIENT_SECRET,
}),
});
const { access_token } = await tokenRes.json();
const headers = { Authorization: `Bearer ${access_token}` };
// Full extraction — no date filter for weekly refresh
const bookingsRes = await fetch(
`${process.env.NAVAN_BASE_URL}/get_admin_trips`,
{ headers }
);
const bookings = await bookingsRes.json();
console.log(`Extracted ${bookings.length} bookings for full refresh`);
SQL merge-upsert pattern (PostgreSQL):
-- Staging table receives raw API data
CREATE TABLE IF NOT EXISTS navan_booking_staging (
uuid TEXT PRIMARY KEY,
traveler_email TEXT,
origin TEXT,
destination TEXT,
start_date DATE,
end_date DATE,
total_cost NUMERIC(12,2),
currency TEXT DEFAULT 'USD',
department TEXT,
cost_center TEXT,
status TEXT,
in_policy BOOLEAN,
created_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ,
synced_at TIMESTAMPTZ DEFAULT NOW()
);
-- Merge-upsert: insert new records, update changed records
INSERT INTO navan_booking AS b
SELECT * FROM navan_booking_staging s
ON CONFLICT (uuid) DO UPDATE SET
traveler_email = EXCLUDED.traveler_email,
origin = EXCLUDED.origin,
destination = EXCLUDED.destination,
start_date = EXCLUDED.start_date,
end_date = EXCLUDED.end_date,
total_cost = EXCLUDED.total_cost,
status = EXCLUDED.status,
in_policy = EXCLUDED.in_policy,
updated_at = EXCLUDED.updated_at,
synced_at = NOW()
WHERE b.updated_at < EXCLUDED.updated_at;
Step 2: Incremental Sync for TRANSACTION Table
TRANSACTION data is append-only. Use watermark-based sync to pull only new records.
// Track high-watermark for incremental pulls
interface SyncState {
lastSyncDate: string; // ISO date of last successful sync
lastTransactionId: string;
}
async function loadSyncState(): Promise<SyncState> {
const fs = await import('fs');
try {
return JSON.parse(fs.readFileSync('.navan-sync-state.json', 'utf-8'));
} catch {
return { lastSyncDate: '2025-01-01', lastTransactionId: '' };
}
}
async function saveSyncState(state: SyncState) {
const fs = await import('fs');
fs.writeFileSync('.navan-sync-state.json', JSON.stringify(state, null, 2));
}
// Pull transactions since last watermark
const state = await loadSyncState();
const today = new Date().toISOString().split('T')[0];
const txnRes = await fetch(
`${process.env.NAVAN_BASE_URL}/get_expense_transactions` +
`?start_date=${state.lastSyncDate}&end_date=${today}`,
{ headers }
);
const transactions = await txnRes.json();
// Filter out already-seen transactions
const newTxns = transactions.filter(
(t: any) => t.transaction_id > state.lastTransactionId
);
console.log(`New transactions since ${state.lastSyncDate}: ${newTxns.length}`);
// Update watermark after successful load
if (newTxns.length > 0) {
await saveSyncState({
lastSyncDate: today,
lastTransactionId: newTxns[newTxns.length - 1].transaction_id,
});
}
Step 3: Webhook Endpoint for Real-Time Events
import { createServer } from 'http';
import { createHmac } from 'crypto';
// Webhook handler for real-time Navan events
const server = createServer(async (req, res) => {
if (req.method !== 'POST' || req.url !== '/navan/webhook') {
res.writeHead(404);
res.end();
return;
}
const chunks: Buffer[] = [];
for await (const chunk of req) chunks.push(chunk as Buffer);
const body = Buffer.concat(chunks).toString();
// Verify webhook signature
const signature = req.headers['x-navan-signature'] as string;
const expected = createHmac('sha256', process.env.NAVAN_WEBHOOK_SECRET!)
.update(body)
.digest('hex');
if (signature !== expected) {
console.error('Invalid webhook signature');
res.writeHead(401);
res.end('Unauthorized');
return;
}
const event = JSON.parse(body);
console.log(`Webhook event: ${event.type}`);
switch (event.type) {
case 'booking.created':
console.log(`New booking: ${event.data.uuid}`);
break;
case 'booking.updated':
console.log(`Booking updated: ${event.data.uuid}`);
break;
case 'booking.cancelled':
console.log(`Booking cancelled: ${event.data.uuid}`);
break;
case 'expense.submitted':
console.log(`Expense submitted: ${event.data.transaction_id}`);
break;
case 'expense.approved':
console.log(`Expense approved: ${event.data.transaction_id}`);
break;
default:
console.log(`Unknown event type: ${event.type}`);
}
res.writeHead(200);
res.end('OK');
});
server.listen(3000, () => console.log('Webhook listener on :3000'));
Step 4: Airbyte Connector Sync Configuration
# Airbyte source-navan connector (v0.0.42)
# Production sync mode configuration
source:
sourceDefinitionId: source-navan
connectionConfiguration:
client_id: "${NAVAN_CLIENT_ID}"
client_secret: "${NAVAN_CLIENT_SECRET}"
# Sync catalog — bookings stream
syncCatalog:
streams:
- stream:
name: bookings
jsonSchema: {}
config:
# Full Refresh for BOOKING (weekly re-import model)
syncMode: full_refresh
destinationSyncMode: overwrite
# Alternative: append_dedup with uuid as primary key
# syncMode: full_refresh
# destinationSyncMode: append_dedup
# primaryKey: [["uuid"]]
# Schedule: run weekly to match Navan's BOOKING refresh cycle
schedule:
scheduleType: cron
cronExpression: "0 2 * * 0" # Sunday 2am UTC
Step 5: Sync Monitoring and Alerting
// Monitor sync health and detect drift
interface SyncMetrics {
tableName: string;
lastSyncTime: Date;
recordCount: number;
expectedFrequency: string;
isStale: boolean;
}
async function checkSyncHealth(): Promise<SyncMetrics[]> {
const state = await loadSyncState();
const lastSync = new Date(state.lastSyncDate);
const hoursSinceSync = (Date.now() - lastSync.getTime()) / (1000 * 60 * 60);
return [
{
tableName: 'BOOKING',
lastSyncTime: lastSync,
recordCount: 0, // populated from warehouse query
expectedFrequency: 'weekly',
isStale: hoursSinceSync > 7 * 24 + 6, // alert if > 7.25 days
},
{
tableName: 'TRANSACTION',
lastSyncTime: lastSync,
recordCount: 0,
expectedFrequency: 'daily',
isStale: hoursSinceSync > 25, // alert if > 25 hours
},
];
}
const health = await checkSyncHealth();
health.forEach(m => {
const status = m.isStale ? 'STALE' : 'OK';
console.log(`${m.tableName}: ${status} (last sync: ${m.lastSyncTime.toISOString()})`);
});
Step 6: Idempotent Load Pattern
// Ensure loads are idempotent — safe to re-run without side effects
async function idempotentLoad(records: any[], tableName: string) {
const batchId = `${tableName}-${new Date().toISOString()}`;
// 1. Write to staging with batch ID
console.log(`Loading ${records.length} records to ${tableName}_staging (batch: ${batchId})`);
// 2. Merge-upsert from staging to target
// Uses UUID as natural key — same record always resolves to same row
console.log(`Merging ${tableName}_staging -> ${tableName}`);
// 3. Record batch metadata for audit
console.log(`Batch ${batchId} complete: ${records.length} records processed`);
return { batchId, recordCount: records.length, status: 'complete' };
}
Output
Successful execution produces:
- Full-refresh BOOKING sync with merge-upsert deduplication
- Incremental TRANSACTION sync with watermark state tracking
- Webhook endpoint for real-time event processing
- Configured Airbyte connector with production-ready sync schedule
- Sync health monitoring with staleness alerting
Error Handling
| Error | HTTP Code | Cause | Solution |
|---|---|---|---|
| Unauthorized | 401 | Expired or invalid bearer token | Re-authenticate via POST /authenticate; use POST /reauthenticate |
| Rate Limited | 429 | Too many API requests | Use exponential backoff; increase sync interval |
| Timeout | 504 | Full refresh too large | Chunk by date range (30-day windows) |
| Webhook Sig Invalid | 401 | Tampered or replayed event | Verify NAVANWEBHOOKSECRET; check clock skew |
| Duplicate Records | N/A | Missing UUID dedup in BOOKING sync | Apply merge-upsert with ON CONFLICT (uuid) |
| Sync Drift | N/A | Missed incremental window | Fall back to full refresh; reset watermark |
Examples
Python — Incremental TRANSACTION sync with watermark:
import requests
import json
import os
from datetime import datetime
base_url = os.environ['NAVAN_BASE_URL']
auth = requests.post(f'{base_url}/authenticate', json={
'client_id': os.environ['NAVAN_CLIENT_ID'],
'client_secret': os.environ['NAVAN_CLIENT_SECRET'],
})
headers = {'Authorization': f'Bearer {auth.json()["access_token"]}'}
# Load watermark
try:
with open('.navan-sync-state.json') as f:
state = json.load(f)
except FileNotFoundError:
state = {'last_sync_date': '2025-01-01'}
today = datetime.now().strftime('%Y-%m-%d')
txns = requests.get(
f'{base_url}/get_expense_transactions',
params={'start_date': state['last_sync_date'], 'end_date': today},
headers=headers,
).json()
print(f'Fetched {len(txns)} transactions since {state["last_sync_date"]}')
# Save updated watermark
with open('.navan-sync-state.json', 'w') as f:
json.dump({'last_sync_date': today}, f)
Resources
- Navan Help Center — Official documentation and support
- Booking Data Integration — BOOKING table refresh schedule and schema
- Navan Integrations — Fivetran, Airbyte, and Estuary connector details
Next Steps
After configuring data sync, proceed to navan-observability for pipeline monitoring or navan-performance-tuning for optimizing large-volume syncs.