Change Data Capture in Snowflake Using Streams and Tasks

Change Data Capture in Snowflake Using Streams and Tasks: A Practical Guide

Celestinfo Software Solutions Pvt. Ltd. Feb 27, 2025

Last updated: March 2025

Quick answer: Snowflake Streams track row-level changes (inserts, updates, deletes) on a source table. Tasks execute SQL on a cron or interval schedule. Together, they form a native CDC pipeline: the Stream captures what changed, the Task periodically processes those changes into a target table - no external orchestrator required.

What Streams Are and Why They Exist


A Stream is a change-tracking object attached to a table. It doesn't store a copy of the data. Instead, it records an offset - a pointer to where you last consumed changes. When you query the Stream, Snowflake compares the current table state against that offset and returns the delta: every row that was inserted, updated, or deleted since your last read.


Each row returned by a Stream includes three metadata columns:


ColumnValuesMeaning
METADATA$ACTIONINSERT or DELETEThe type of change. Updates appear as a DELETE + INSERT pair.
METADATA$ISUPDATETRUE or FALSETRUE when the INSERT/DELETE pair represents an UPDATE.
METADATA$ROW_IDStringA unique identifier for the row, stable across changes.

The UPDATE representation is the part that trips people up. Snowflake doesn't have a native UPDATE action in Streams. Instead, an update to a row shows up as two rows: one with METADATA$ACTION = 'DELETE' (the old values) and one with METADATA$ACTION = 'INSERT' (the new values), both with METADATA$ISUPDATE = TRUE. You filter on these columns in your merge logic.


Stream Types


Snowflake offers three Stream types, each tracking different change categories:



SQL - Creating Streams
-- Standard stream (full CDC)
CREATE OR REPLACE STREAM orders_stream ON TABLE raw.orders;

-- Append-only stream (inserts only)
CREATE OR REPLACE STREAM events_stream ON TABLE raw.click_events
  APPEND_ONLY = TRUE;

Tasks: Scheduled SQL Inside Snowflake


A Task is a Snowflake object that runs a SQL statement on a schedule. You can set the schedule using cron syntax or a fixed interval in minutes. Tasks run against a specific warehouse (or use serverless compute on Enterprise Edition+).


SQL - Creating a Task
-- Run every 5 minutes
CREATE OR REPLACE TASK process_orders_task
  WAREHOUSE = etl_wh
  SCHEDULE = '5 MINUTE'
  WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')
AS
  INSERT INTO analytics.orders_target
  SELECT order_id, customer_id, amount, order_date
  FROM orders_stream
  WHERE METADATA$ACTION = 'INSERT';

The WHEN SYSTEM$STREAM_HAS_DATA('orders_stream') condition is critical. Without it, the Task fires every 5 minutes regardless of whether there's new data, burning warehouse credits for nothing. With it, the Task only runs when the Stream has unconsumed changes.


Tasks are created in a suspended state. You must explicitly resume them:

SQL
ALTER TASK process_orders_task RESUME;

Building a Complete CDC Pipeline


Here's the real-world pattern: a source table receives raw data from an ingestion tool, a Stream tracks changes, and a Task processes those changes into a clean target table using MERGE. This handles inserts, updates, and deletes in one operation.


SQL - Complete CDC Pipeline
-- 1. Source table (populated by your ingestion tool)
CREATE OR REPLACE TABLE raw.customers (
  customer_id   INT,
  name          VARCHAR(200),
  email         VARCHAR(200),
  status        VARCHAR(20),
  updated_at    TIMESTAMP_NTZ
);

-- 2. Target table (clean, deduplicated)
CREATE OR REPLACE TABLE analytics.dim_customers (
  customer_id   INT PRIMARY KEY,
  name          VARCHAR(200),
  email         VARCHAR(200),
  status        VARCHAR(20),
  updated_at    TIMESTAMP_NTZ,
  cdc_action    VARCHAR(10),
  cdc_timestamp TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);

-- 3. Stream on source
CREATE OR REPLACE STREAM raw.customers_stream ON TABLE raw.customers;

-- 4. Task that processes changes every 3 minutes
CREATE OR REPLACE TASK analytics.sync_customers_task
  WAREHOUSE = etl_wh
  SCHEDULE = '3 MINUTE'
  WHEN SYSTEM$STREAM_HAS_DATA('raw.customers_stream')
AS
  MERGE INTO analytics.dim_customers tgt
  USING (
    SELECT customer_id, name, email, status, updated_at,
           METADATA$ACTION AS action,
           METADATA$ISUPDATE AS is_update
    FROM raw.customers_stream
  ) src
  ON tgt.customer_id = src.customer_id
  WHEN MATCHED AND src.action = 'DELETE' AND src.is_update = FALSE
    THEN DELETE
  WHEN MATCHED AND src.action = 'INSERT' AND src.is_update = TRUE
    THEN UPDATE SET
      tgt.name = src.name,
      tgt.email = src.email,
      tgt.status = src.status,
      tgt.updated_at = src.updated_at,
      tgt.cdc_action = 'UPDATE',
      tgt.cdc_timestamp = CURRENT_TIMESTAMP()
  WHEN NOT MATCHED AND src.action = 'INSERT'
    THEN INSERT (customer_id, name, email, status, updated_at, cdc_action)
    VALUES (src.customer_id, src.name, src.email, src.status, src.updated_at, 'INSERT');

-- 5. Resume the task
ALTER TASK analytics.sync_customers_task RESUME;

The Stale Stream Problem


Streams have a retention window tied to the table's DATA_RETENTION_TIME_IN_DAYS setting. The default is 14 days for permanent tables. If you don't consume the Stream within that window, it goes stale - meaning Snowflake can no longer reconstruct the change delta because the historical micro-partitions have been garbage-collected.


A stale Stream can't be recovered. You'll need to drop it, recreate it, and do an initial full load of the target table. To avoid this:



Task Graphs: Multi-Step Pipelines


A single Task often isn't enough. You might need to stage data, apply business rules, and then load a final table. Task graphs (DAGs) let you chain Tasks with dependencies.


SQL - Task Graph
-- Root task: runs on schedule
CREATE OR REPLACE TASK stage_orders
  WAREHOUSE = etl_wh
  SCHEDULE = '10 MINUTE'
  WHEN SYSTEM$STREAM_HAS_DATA('raw.orders_stream')
AS
  INSERT INTO staging.orders_staged
  SELECT * FROM raw.orders_stream;

-- Child task: runs after stage_orders completes
CREATE OR REPLACE TASK transform_orders
  WAREHOUSE = etl_wh
  AFTER stage_orders
AS
  INSERT INTO analytics.orders_clean
  SELECT order_id, customer_id, amount,
         CASE WHEN amount > 1000 THEN 'high_value' ELSE 'standard' END AS tier
  FROM staging.orders_staged;

-- Grandchild task: runs after transform_orders
CREATE OR REPLACE TASK aggregate_orders
  WAREHOUSE = etl_wh
  AFTER transform_orders
AS
  MERGE INTO analytics.daily_order_summary tgt
  USING (
    SELECT order_date, tier, SUM(amount) AS total, COUNT(*) AS cnt
    FROM analytics.orders_clean
    GROUP BY order_date, tier
  ) src ON tgt.order_date = src.order_date AND tgt.tier = src.tier
  WHEN MATCHED THEN UPDATE SET tgt.total = src.total, tgt.cnt = src.cnt
  WHEN NOT MATCHED THEN INSERT VALUES (src.order_date, src.tier, src.total, src.cnt);

-- Resume in reverse dependency order
ALTER TASK aggregate_orders RESUME;
ALTER TASK transform_orders RESUME;
ALTER TASK stage_orders RESUME;

Only the root Task has a SCHEDULE. Child Tasks fire automatically when their parent completes. You must resume child Tasks before the root, or Snowflake will skip them.


Error Handling and Monitoring


Tasks support an ERROR_INTEGRATION parameter for sending failure notifications, and you can configure SUSPEND_TASK_AFTER_NUM_FAILURES to auto-suspend a Task after repeated failures. The default setting for standalone tasks is 10.


Monitor Task execution with TASK_HISTORY:

SQL
SELECT name, state, scheduled_time, completed_time, error_message
FROM TABLE(information_schema.task_history(
  task_name => 'PROCESS_ORDERS_TASK',
  scheduled_time_range_start => DATEADD('hour', -24, CURRENT_TIMESTAMP())
))
ORDER BY scheduled_time DESC;

The Critical Gotcha: Transaction Control


Here's the mistake that catches almost every team on their first CDC implementation: consuming a Stream in a DML statement advances the Stream's offset, even if the surrounding transaction rolls back in certain autocommit scenarios. If your INSERT into the target table succeeds but a downstream step fails, the Stream offset has already moved forward. Those changes are gone from the Stream.


The fix: always use explicit transaction control when consuming Streams outside of Tasks. Within a Task, Snowflake wraps the execution in an implicit transaction, so the offset only advances if the Task's SQL succeeds. But if you're consuming Streams in stored procedures or multi-statement scripts, wrap everything in BEGIN ... COMMIT:

SQL - Safe Stream Consumption
BEGIN;
  INSERT INTO analytics.target_table
  SELECT * FROM source_stream WHERE METADATA$ACTION = 'INSERT';
COMMIT;

Key Takeaways



Frequently Asked Questions

Q: What is a Snowflake Stream?

A Snowflake Stream is a change tracking object on a table that records inserts, updates, and deletes. It exposes metadata columns (METADATA$ACTION, METADATA$ISUPDATE, METADATA$ROW_ID) that let you identify exactly what changed since the last time the Stream was consumed.

Q: What happens when a Snowflake Stream goes stale?

A Stream goes stale when its offset falls outside the table's data retention period (default 14 days). Once stale, the Stream can no longer provide change data and must be recreated. You lose the ability to capture changes that occurred during the gap.

Q: How do Snowflake Tasks differ from cron jobs?

Snowflake Tasks run SQL statements on a schedule (cron syntax or fixed intervals) entirely within Snowflake, using a specified warehouse. Unlike external cron jobs, Tasks support WHEN conditions to skip runs when there's no work, and they can form DAGs (task graphs) for multi-step pipelines.

Q: Does consuming a Snowflake Stream advance its offset even if the transaction fails?

Within a Task, the offset only advances on success. But in stored procedures or scripts, if the DML succeeds and autocommit is on, the offset advances even if downstream steps fail. Always wrap Stream consumption in explicit BEGIN/COMMIT blocks.

Pranay Vatsal, Founder & CEO

Pranay Vatsal is the Founder & CEO of CelestInfo with deep expertise in Snowflake, data architecture, and building production-grade data systems for global enterprises.

Related Articles

Burning Questions
About CelestInfo

Simple answers to make things clear.

Our AI insights are continuously trained on large datasets and validated by experts to ensure high accuracy.

Absolutely. CelestInfo supports integration with a wide range of industry-standard software and tools.

We implement enterprise-grade encryption, access controls, and regular audits to ensure your data is safe.

Insights are updated in real-time as new data becomes available.

We offer 24/7 support via chat, email, and dedicated account managers.

Still have questions?

Ready? Let's Talk!

Get expert insights and answers tailored to your business requirements and transformation.