navan-data-sync

Implement incremental sync strategies for Navan BOOKING and TRANSACTION data with ETL pipeline patterns. Use when setting up production data pipelines, debugging sync drift, or adding real-time event processing. Trigger with "navan data sync", "navan incremental sync", "navan ETL pipeline".

claude-code
7 Tools
navan-pack Plugin
saas packs Category

Allowed Tools

ReadWriteEditBash(npm:*)Bash(curl:*)Bash(pip:*)Grep

Provided by Plugin

navan-pack

Claude Code skill pack for Navan (24 skills)

saas packs v1.0.0
View Plugin

Installation

This skill is included in the navan-pack plugin:

/plugin install navan-pack@claude-code-plugins-plus

Click to copy

Instructions

Navan — Data Sync

Overview

This skill provides production-grade sync strategies for Navan data. The two primary tables have fundamentally different sync models: BOOKING requires weekly full-refresh with merge-upsert logic (every record is re-imported, keyed by UUID), while TRANSACTION is incremental and append-only. Real-time use cases require webhook callbacks for event-driven processing. This skill covers all three tiers — scheduled full-refresh, incremental watermark-based sync, and real-time webhooks — along with Airbyte connector configuration and idempotent SQL upsert patterns.

Prerequisites

  • Navan account with OAuth 2.0 API credentials (see navan-install-auth)
  • Destination warehouse (Snowflake, BigQuery, PostgreSQL, or Redshift)
  • For managed sync: Airbyte instance (Cloud or OSS) with source-navan v0.0.42+
  • For webhooks: publicly accessible HTTPS endpoint for callbacks
  • Node.js 18+ or Python 3.8+
  • Environment variables: NAVANCLIENTID, NAVANCLIENTSECRET, NAVANBASEURL

Instructions

Step 1: Full-Refresh Sync for BOOKING Table

The BOOKING table is re-imported weekly by Navan. Every record is refreshed, so your sync must use merge-upsert logic to avoid duplicates while capturing updates.


const tokenRes = await fetch(`${process.env.NAVAN_BASE_URL}/authenticate`, {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify({
    client_id: process.env.NAVAN_CLIENT_ID,
    client_secret: process.env.NAVAN_CLIENT_SECRET,
  }),
});
const { access_token } = await tokenRes.json();
const headers = { Authorization: `Bearer ${access_token}` };

// Full extraction — no date filter for weekly refresh
const bookingsRes = await fetch(
  `${process.env.NAVAN_BASE_URL}/get_admin_trips`,
  { headers }
);
const bookings = await bookingsRes.json();
console.log(`Extracted ${bookings.length} bookings for full refresh`);

SQL merge-upsert pattern (PostgreSQL):


-- Staging table receives raw API data
CREATE TABLE IF NOT EXISTS navan_booking_staging (
  uuid TEXT PRIMARY KEY,
  traveler_email TEXT,
  origin TEXT,
  destination TEXT,
  start_date DATE,
  end_date DATE,
  total_cost NUMERIC(12,2),
  currency TEXT DEFAULT 'USD',
  department TEXT,
  cost_center TEXT,
  status TEXT,
  in_policy BOOLEAN,
  created_at TIMESTAMPTZ,
  updated_at TIMESTAMPTZ,
  synced_at TIMESTAMPTZ DEFAULT NOW()
);

-- Merge-upsert: insert new records, update changed records
INSERT INTO navan_booking AS b
SELECT * FROM navan_booking_staging s
ON CONFLICT (uuid) DO UPDATE SET
  traveler_email = EXCLUDED.traveler_email,
  origin = EXCLUDED.origin,
  destination = EXCLUDED.destination,
  start_date = EXCLUDED.start_date,
  end_date = EXCLUDED.end_date,
  total_cost = EXCLUDED.total_cost,
  status = EXCLUDED.status,
  in_policy = EXCLUDED.in_policy,
  updated_at = EXCLUDED.updated_at,
  synced_at = NOW()
WHERE b.updated_at < EXCLUDED.updated_at;

Step 2: Incremental Sync for TRANSACTION Table

TRANSACTION data is append-only. Use watermark-based sync to pull only new records.


// Track high-watermark for incremental pulls
interface SyncState {
  lastSyncDate: string;  // ISO date of last successful sync
  lastTransactionId: string;
}

async function loadSyncState(): Promise<SyncState> {
  const fs = await import('fs');
  try {
    return JSON.parse(fs.readFileSync('.navan-sync-state.json', 'utf-8'));
  } catch {
    return { lastSyncDate: '2025-01-01', lastTransactionId: '' };
  }
}

async function saveSyncState(state: SyncState) {
  const fs = await import('fs');
  fs.writeFileSync('.navan-sync-state.json', JSON.stringify(state, null, 2));
}

// Pull transactions since last watermark
const state = await loadSyncState();
const today = new Date().toISOString().split('T')[0];

const txnRes = await fetch(
  `${process.env.NAVAN_BASE_URL}/get_expense_transactions` +
  `?start_date=${state.lastSyncDate}&end_date=${today}`,
  { headers }
);
const transactions = await txnRes.json();

// Filter out already-seen transactions
const newTxns = transactions.filter(
  (t: any) => t.transaction_id > state.lastTransactionId
);
console.log(`New transactions since ${state.lastSyncDate}: ${newTxns.length}`);

// Update watermark after successful load
if (newTxns.length > 0) {
  await saveSyncState({
    lastSyncDate: today,
    lastTransactionId: newTxns[newTxns.length - 1].transaction_id,
  });
}

Step 3: Webhook Endpoint for Real-Time Events


import { createServer } from 'http';
import { createHmac } from 'crypto';

// Webhook handler for real-time Navan events
const server = createServer(async (req, res) => {
  if (req.method !== 'POST' || req.url !== '/navan/webhook') {
    res.writeHead(404);
    res.end();
    return;
  }

  const chunks: Buffer[] = [];
  for await (const chunk of req) chunks.push(chunk as Buffer);
  const body = Buffer.concat(chunks).toString();

  // Verify webhook signature
  const signature = req.headers['x-navan-signature'] as string;
  const expected = createHmac('sha256', process.env.NAVAN_WEBHOOK_SECRET!)
    .update(body)
    .digest('hex');

  if (signature !== expected) {
    console.error('Invalid webhook signature');
    res.writeHead(401);
    res.end('Unauthorized');
    return;
  }

  const event = JSON.parse(body);
  console.log(`Webhook event: ${event.type}`);

  switch (event.type) {
    case 'booking.created':
      console.log(`New booking: ${event.data.uuid}`);
      break;
    case 'booking.updated':
      console.log(`Booking updated: ${event.data.uuid}`);
      break;
    case 'booking.cancelled':
      console.log(`Booking cancelled: ${event.data.uuid}`);
      break;
    case 'expense.submitted':
      console.log(`Expense submitted: ${event.data.transaction_id}`);
      break;
    case 'expense.approved':
      console.log(`Expense approved: ${event.data.transaction_id}`);
      break;
    default:
      console.log(`Unknown event type: ${event.type}`);
  }

  res.writeHead(200);
  res.end('OK');
});

server.listen(3000, () => console.log('Webhook listener on :3000'));

Step 4: Airbyte Connector Sync Configuration


# Airbyte source-navan connector (v0.0.42)
# Production sync mode configuration
source:
  sourceDefinitionId: source-navan
  connectionConfiguration:
    client_id: "${NAVAN_CLIENT_ID}"
    client_secret: "${NAVAN_CLIENT_SECRET}"

# Sync catalog — bookings stream
syncCatalog:
  streams:
    - stream:
        name: bookings
        jsonSchema: {}
      config:
        # Full Refresh for BOOKING (weekly re-import model)
        syncMode: full_refresh
        destinationSyncMode: overwrite
        # Alternative: append_dedup with uuid as primary key
        # syncMode: full_refresh
        # destinationSyncMode: append_dedup
        # primaryKey: [["uuid"]]

# Schedule: run weekly to match Navan's BOOKING refresh cycle
schedule:
  scheduleType: cron
  cronExpression: "0 2 * * 0"  # Sunday 2am UTC

Step 5: Sync Monitoring and Alerting


// Monitor sync health and detect drift
interface SyncMetrics {
  tableName: string;
  lastSyncTime: Date;
  recordCount: number;
  expectedFrequency: string;
  isStale: boolean;
}

async function checkSyncHealth(): Promise<SyncMetrics[]> {
  const state = await loadSyncState();
  const lastSync = new Date(state.lastSyncDate);
  const hoursSinceSync = (Date.now() - lastSync.getTime()) / (1000 * 60 * 60);

  return [
    {
      tableName: 'BOOKING',
      lastSyncTime: lastSync,
      recordCount: 0,  // populated from warehouse query
      expectedFrequency: 'weekly',
      isStale: hoursSinceSync > 7 * 24 + 6, // alert if > 7.25 days
    },
    {
      tableName: 'TRANSACTION',
      lastSyncTime: lastSync,
      recordCount: 0,
      expectedFrequency: 'daily',
      isStale: hoursSinceSync > 25, // alert if > 25 hours
    },
  ];
}

const health = await checkSyncHealth();
health.forEach(m => {
  const status = m.isStale ? 'STALE' : 'OK';
  console.log(`${m.tableName}: ${status} (last sync: ${m.lastSyncTime.toISOString()})`);
});

Step 6: Idempotent Load Pattern


// Ensure loads are idempotent — safe to re-run without side effects
async function idempotentLoad(records: any[], tableName: string) {
  const batchId = `${tableName}-${new Date().toISOString()}`;

  // 1. Write to staging with batch ID
  console.log(`Loading ${records.length} records to ${tableName}_staging (batch: ${batchId})`);

  // 2. Merge-upsert from staging to target
  // Uses UUID as natural key — same record always resolves to same row
  console.log(`Merging ${tableName}_staging -> ${tableName}`);

  // 3. Record batch metadata for audit
  console.log(`Batch ${batchId} complete: ${records.length} records processed`);

  return { batchId, recordCount: records.length, status: 'complete' };
}

Output

Successful execution produces:

  • Full-refresh BOOKING sync with merge-upsert deduplication
  • Incremental TRANSACTION sync with watermark state tracking
  • Webhook endpoint for real-time event processing
  • Configured Airbyte connector with production-ready sync schedule
  • Sync health monitoring with staleness alerting

Error Handling

Error HTTP Code Cause Solution
Unauthorized 401 Expired or invalid bearer token Re-authenticate via POST /authenticate; use POST /reauthenticate
Rate Limited 429 Too many API requests Use exponential backoff; increase sync interval
Timeout 504 Full refresh too large Chunk by date range (30-day windows)
Webhook Sig Invalid 401 Tampered or replayed event Verify NAVANWEBHOOKSECRET; check clock skew
Duplicate Records N/A Missing UUID dedup in BOOKING sync Apply merge-upsert with ON CONFLICT (uuid)
Sync Drift N/A Missed incremental window Fall back to full refresh; reset watermark

Examples

Python — Incremental TRANSACTION sync with watermark:


import requests
import json
import os
from datetime import datetime

base_url = os.environ['NAVAN_BASE_URL']
auth = requests.post(f'{base_url}/authenticate', json={
    'client_id': os.environ['NAVAN_CLIENT_ID'],
    'client_secret': os.environ['NAVAN_CLIENT_SECRET'],
})
headers = {'Authorization': f'Bearer {auth.json()["access_token"]}'}

# Load watermark
try:
    with open('.navan-sync-state.json') as f:
        state = json.load(f)
except FileNotFoundError:
    state = {'last_sync_date': '2025-01-01'}

today = datetime.now().strftime('%Y-%m-%d')
txns = requests.get(
    f'{base_url}/get_expense_transactions',
    params={'start_date': state['last_sync_date'], 'end_date': today},
    headers=headers,
).json()

print(f'Fetched {len(txns)} transactions since {state["last_sync_date"]}')

# Save updated watermark
with open('.navan-sync-state.json', 'w') as f:
    json.dump({'last_sync_date': today}, f)

Resources

Next Steps

After configuring data sync, proceed to navan-observability for pipeline monitoring or navan-performance-tuning for optimizing large-volume syncs.

Ready to use navan-pack?