snowflake-core-workflow-b

'Execute Snowflake data transformation with streams, tasks, and dynamic

5 Tools
snowflake-pack Plugin
saas packs Category

Allowed Tools

ReadWriteEditBash(npm:*)Grep

Provided by Plugin

snowflake-pack

Claude Code skill pack for Snowflake data platform — snowflake-sdk, SQL, Snowpark (30 skills)

saas packs v1.0.0
View Plugin

Installation

This skill is included in the snowflake-pack plugin:

/plugin install snowflake-pack@claude-code-plugins-plus

Click to copy

Instructions

Snowflake Core Workflow B — Data Transformation

Overview

Build ELT pipelines using streams (change data capture), tasks (scheduling), and dynamic tables (declarative transforms).

Prerequisites

  • Data loaded into Snowflake (via snowflake-core-workflow-a)
  • Understanding of ELT vs ETL patterns
  • Role with CREATE TASK, CREATE STREAM privileges

Instructions

Step 1: Create a Stream for Change Data Capture


-- Track changes on the raw orders table
CREATE OR REPLACE STREAM orders_stream ON TABLE raw_orders
  APPEND_ONLY = FALSE;

-- Append-only stream (lighter weight, inserts only)
CREATE OR REPLACE STREAM events_stream ON TABLE raw_events
  APPEND_ONLY = TRUE;

-- Check what's changed since last consumption
SELECT * FROM orders_stream;
-- METADATA$ACTION = 'INSERT' | 'DELETE'
-- METADATA$ISUPDATE = TRUE if row is part of an UPDATE
-- METADATA$ROW_ID = unique row identifier

Step 2: Create a Task to Process Stream Data


-- Transform task runs when stream has data
CREATE OR REPLACE TASK transform_orders
  WAREHOUSE = TRANSFORM_WH
  SCHEDULE = '5 MINUTE'
  WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')
AS
  MERGE INTO dim_orders AS target
  USING (
    SELECT
      order_id,
      customer_id,
      amount::DECIMAL(12,2) AS amount,
      order_date::TIMESTAMP_NTZ AS order_date,
      CASE
        WHEN amount >= 1000 THEN 'high_value'
        WHEN amount >= 100 THEN 'medium_value'
        ELSE 'standard'
      END AS order_tier,
      CURRENT_TIMESTAMP() AS processed_at
    FROM orders_stream
    WHERE METADATA$ACTION = 'INSERT'
  ) AS source
  ON target.order_id = source.order_id
  WHEN MATCHED THEN UPDATE SET
    target.amount = source.amount,
    target.order_tier = source.order_tier,
    target.processed_at = source.processed_at
  WHEN NOT MATCHED THEN INSERT
    (order_id, customer_id, amount, order_date, order_tier, processed_at)
  VALUES
    (source.order_id, source.customer_id, source.amount,
     source.order_date, source.order_tier, source.processed_at);

-- Enable the task
ALTER TASK transform_orders RESUME;

Step 3: Build a Task DAG (Directed Acyclic Graph)


-- Root task: aggregate daily metrics
CREATE OR REPLACE TASK daily_metrics_root
  WAREHOUSE = TRANSFORM_WH
  SCHEDULE = 'USING CRON 0 6 * * * America/New_York'
AS
  INSERT INTO daily_order_metrics
  SELECT
    CURRENT_DATE() - 1 AS metric_date,
    COUNT(*) AS total_orders,
    SUM(amount) AS total_revenue,
    AVG(amount) AS avg_order_value,
    COUNT(DISTINCT customer_id) AS unique_customers
  FROM dim_orders
  WHERE order_date >= CURRENT_DATE() - 1
    AND order_date < CURRENT_DATE();

-- Child task: runs after root completes
CREATE OR REPLACE TASK update_customer_segments
  WAREHOUSE = TRANSFORM_WH
  AFTER daily_metrics_root
AS
  MERGE INTO customer_segments AS target
  USING (
    SELECT customer_id,
      COUNT(*) AS order_count,
      SUM(amount) AS lifetime_value,
      CASE
        WHEN SUM(amount) >= 10000 THEN 'platinum'
        WHEN SUM(amount) >= 5000 THEN 'gold'
        WHEN SUM(amount) >= 1000 THEN 'silver'
        ELSE 'bronze'
      END AS segment
    FROM dim_orders GROUP BY customer_id
  ) AS source
  ON target.customer_id = source.customer_id
  WHEN MATCHED THEN UPDATE SET
    target.order_count = source.order_count,
    target.lifetime_value = source.lifetime_value,
    target.segment = source.segment
  WHEN NOT MATCHED THEN INSERT VALUES
    (source.customer_id, source.order_count, source.lifetime_value, source.segment);

-- Resume tasks (children first, then root)
ALTER TASK update_customer_segments RESUME;
ALTER TASK daily_metrics_root RESUME;

Step 4: Dynamic Tables (Declarative Alternative)


-- Auto-refreshes based on target freshness — no streams/tasks needed
CREATE OR REPLACE DYNAMIC TABLE customer_360
  TARGET_LAG = '10 minutes'
  WAREHOUSE = TRANSFORM_WH
AS
  SELECT
    c.customer_id, c.name, c.email,
    COUNT(o.order_id) AS total_orders,
    COALESCE(SUM(o.amount), 0) AS lifetime_value,
    MAX(o.order_date) AS last_order_date,
    DATEDIFF('day', MAX(o.order_date), CURRENT_DATE()) AS days_since_last_order
  FROM customers c
  LEFT JOIN dim_orders o ON c.customer_id = o.customer_id
  GROUP BY c.customer_id, c.name, c.email;

-- Monitor refresh status
SELECT name, target_lag, refresh_mode, scheduling_state
FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLES())
WHERE name = 'CUSTOMER_360';

Step 5: Monitor Pipelines


-- Task run history
SELECT name, state, error_message, scheduled_time
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
  SCHEDULED_TIME_RANGE_START => DATEADD(hours, -24, CURRENT_TIMESTAMP())
))
ORDER BY scheduled_time DESC;

-- Find failed runs
SELECT name, state, error_message, scheduled_time
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY())
WHERE state = 'FAILED'
  AND scheduled_time >= DATEADD(hours, -24, CURRENT_TIMESTAMP());

-- Stream lag check — if STALE = TRUE, data may be lost
SHOW STREAMS LIKE 'orders_stream';

Error Handling

Error Cause Solution
Task is suspended Not resumed after creation ALTER TASK x RESUME
Stream is stale Data retention exceeded Recreate stream; increase DATARETENTIONTIMEINDAYS
Warehouse does not exist Wrong warehouse in task Verify warehouse name
MERGE: duplicate rows Non-unique join key Add dedup CTE before MERGE
Dynamic table refresh failed Source schema changed Check upstream table definitions

Resources

Next Steps

For common errors and troubleshooting, see snowflake-common-errors.

Ready to use snowflake-pack?