clari-core-workflow-a

'Build a Clari forecast export pipeline to your data warehouse.

6 Tools
clari-pack Plugin
saas packs Category

Allowed Tools

ReadWriteEditBash(python3:*)Bash(curl:*)Grep

Provided by Plugin

clari-pack

Claude Code skill pack for Clari (18 skills)

saas packs v1.0.0
View Plugin

Installation

This skill is included in the clari-pack plugin:

/plugin install clari-pack@claude-code-plugins-plus

Click to copy

Instructions

Clari Core Workflow: Forecast Export Pipeline

Overview

Primary workflow: build an automated pipeline that exports forecast submissions, quota, adjustments, and CRM data from Clari to your data warehouse. Supports Snowflake, BigQuery, and PostgreSQL as targets.

Prerequisites

  • Completed clari-install-auth and clari-sdk-patterns setup
  • Target database or data warehouse with write access
  • Python 3.10+ with requests and your DB driver

Instructions

Step 1: Define Export Configuration


# config.py
from dataclasses import dataclass

@dataclass
class ExportConfig:
    forecast_name: str          # From Clari forecast list
    time_periods: list[str]     # e.g., ["2026_Q1", "2025_Q4"]
    export_types: list[str] = None
    currency: str = "USD"
    include_historical: bool = True

    def __post_init__(self):
        if self.export_types is None:
            self.export_types = [
                "forecast",           # Submitted forecast call
                "forecast_updated",   # Updated forecast history
                "quota",              # Quota values
                "adjustment",         # Manager adjustments
                "crm_total",          # Total CRM pipeline
                "crm_closed",         # Closed-won CRM amounts
            ]

Step 2: Build the Export Pipeline


# export_pipeline.py
from clari_client import ClariClient
from config import ExportConfig
import json
from datetime import datetime

def run_export(config: ExportConfig) -> list[dict]:
    client = ClariClient()
    all_entries = []

    for period in config.time_periods:
        print(f"Exporting {config.forecast_name} for {period}...")

        data = client.export_and_download(
            forecast_name=config.forecast_name,
            time_period=period,
        )

        entries = data.get("entries", [])
        for entry in entries:
            entry["_exported_at"] = datetime.utcnow().isoformat()
            entry["_forecast_name"] = config.forecast_name

        all_entries.extend(entries)
        print(f"  {len(entries)} records exported")

    return all_entries

def transform_forecast_data(entries: list[dict]) -> dict:
    total_forecast = sum(e.get("forecastAmount", 0) for e in entries)
    total_quota = sum(e.get("quotaAmount", 0) for e in entries)
    total_closed = sum(e.get("crmClosed", 0) for e in entries)

    return {
        "total_forecast": total_forecast,
        "total_quota": total_quota,
        "total_closed": total_closed,
        "attainment_percent": (total_closed / total_quota * 100) if total_quota else 0,
        "coverage_ratio": (total_forecast / total_quota) if total_quota else 0,
        "rep_count": len(entries),
        "reps": entries,
    }

Step 3: Load to Snowflake


# load_snowflake.py
import snowflake.connector

def load_to_snowflake(entries: list[dict], table: str = "CLARI_FORECASTS"):
    conn = snowflake.connector.connect(
        account=os.environ["SNOWFLAKE_ACCOUNT"],
        user=os.environ["SNOWFLAKE_USER"],
        password=os.environ["SNOWFLAKE_PASSWORD"],
        database="REVENUE_DATA",
        schema="CLARI",
    )

    cursor = conn.cursor()
    cursor.execute(f"""
        CREATE TABLE IF NOT EXISTS {table} (
            owner_name VARCHAR,
            owner_email VARCHAR,
            forecast_amount FLOAT,
            quota_amount FLOAT,
            crm_total FLOAT,
            crm_closed FLOAT,
            adjustment_amount FLOAT,
            time_period VARCHAR,
            exported_at TIMESTAMP,
            forecast_name VARCHAR
        )
    """)

    for entry in entries:
        cursor.execute(f"""
            INSERT INTO {table} VALUES (
                %(ownerName)s, %(ownerEmail)s, %(forecastAmount)s,
                %(quotaAmount)s, %(crmTotal)s, %(crmClosed)s,
                %(adjustmentAmount)s, %(timePeriod)s,
                %(_exported_at)s, %(_forecast_name)s
            )
        """, entry)

    conn.commit()
    print(f"Loaded {len(entries)} records to {table}")

Step 4: Schedule with Cron or Airflow


# Run daily export
if __name__ == "__main__":
    config = ExportConfig(
        forecast_name="company_forecast",
        time_periods=["2026_Q1"],
    )
    entries = run_export(config)
    summary = transform_forecast_data(entries)
    print(f"Pipeline complete: {summary['rep_count']} reps, "
          f"${summary['total_forecast']:,.0f} forecast, "
          f"{summary['attainment_percent']:.1f}% attainment")
    load_to_snowflake(entries)

Error Handling

Error Cause Solution
Empty entries No submitted forecasts for period Verify period has data in Clari UI
Job timeout Large export Increase maxpollattempts
Snowflake auth error Wrong credentials Check env vars
Duplicate records Re-run without dedup Add upsert logic with MERGE

Resources

Next Steps

For pipeline analytics and deal inspection, see clari-core-workflow-b.

Ready to use clari-pack?