Change Data Capture in Snowflake Using Streams and Tasks: A Practical Guide
Last updated: March 2025
Quick answer: Snowflake Streams track row-level changes (inserts, updates, deletes) on a source table. Tasks execute SQL on a cron or interval schedule. Together, they form a native CDC pipeline: the Stream captures what changed, the Task periodically processes those changes into a target table - no external orchestrator required.
What Streams Are and Why They Exist
A Stream is a change-tracking object attached to a table. It doesn't store a copy of the data. Instead, it records an offset - a pointer to where you last consumed changes. When you query the Stream, Snowflake compares the current table state against that offset and returns the delta: every row that was inserted, updated, or deleted since your last read.
Each row returned by a Stream includes three metadata columns:
| Column | Values | Meaning |
|---|---|---|
METADATA$ACTION | INSERT or DELETE | The type of change. Updates appear as a DELETE + INSERT pair. |
METADATA$ISUPDATE | TRUE or FALSE | TRUE when the INSERT/DELETE pair represents an UPDATE. |
METADATA$ROW_ID | String | A unique identifier for the row, stable across changes. |
The UPDATE representation is the part that trips people up. Snowflake doesn't have a native UPDATE action in Streams. Instead, an update to a row shows up as two rows: one with METADATA$ACTION = 'DELETE' (the old values) and one with METADATA$ACTION = 'INSERT' (the new values), both with METADATA$ISUPDATE = TRUE. You filter on these columns in your merge logic.
Stream Types
Snowflake offers three Stream types, each tracking different change categories:
- Standard (default) - Tracks inserts, updates, and deletes. Use this for full CDC.
- Append-only - Tracks only inserts. Ignores updates and deletes. Lighter weight and useful for append-only source tables like event logs.
- Insert-only - Available only on external tables. Tracks new files added to the external stage.
-- Standard stream (full CDC) CREATE OR REPLACE STREAM orders_stream ON TABLE raw.orders; -- Append-only stream (inserts only) CREATE OR REPLACE STREAM events_stream ON TABLE raw.click_events APPEND_ONLY = TRUE;
Tasks: Scheduled SQL Inside Snowflake
A Task is a Snowflake object that runs a SQL statement on a schedule. You can set the schedule using cron syntax or a fixed interval in minutes. Tasks run against a specific warehouse (or use serverless compute on Enterprise Edition+).
-- Run every 5 minutes
CREATE OR REPLACE TASK process_orders_task
WAREHOUSE = etl_wh
SCHEDULE = '5 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')
AS
INSERT INTO analytics.orders_target
SELECT order_id, customer_id, amount, order_date
FROM orders_stream
WHERE METADATA$ACTION = 'INSERT';
The WHEN SYSTEM$STREAM_HAS_DATA('orders_stream') condition is critical. Without it, the Task fires every 5 minutes regardless of whether there's new data, burning warehouse credits for nothing. With it, the Task only runs when the Stream has unconsumed changes.
Tasks are created in a suspended state. You must explicitly resume them:
ALTER TASK process_orders_task RESUME;
Building a Complete CDC Pipeline
Here's the real-world pattern: a source table receives raw data from an ingestion tool, a Stream tracks changes, and a Task processes those changes into a clean target table using MERGE. This handles inserts, updates, and deletes in one operation.
-- 1. Source table (populated by your ingestion tool)
CREATE OR REPLACE TABLE raw.customers (
customer_id INT,
name VARCHAR(200),
email VARCHAR(200),
status VARCHAR(20),
updated_at TIMESTAMP_NTZ
);
-- 2. Target table (clean, deduplicated)
CREATE OR REPLACE TABLE analytics.dim_customers (
customer_id INT PRIMARY KEY,
name VARCHAR(200),
email VARCHAR(200),
status VARCHAR(20),
updated_at TIMESTAMP_NTZ,
cdc_action VARCHAR(10),
cdc_timestamp TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
-- 3. Stream on source
CREATE OR REPLACE STREAM raw.customers_stream ON TABLE raw.customers;
-- 4. Task that processes changes every 3 minutes
CREATE OR REPLACE TASK analytics.sync_customers_task
WAREHOUSE = etl_wh
SCHEDULE = '3 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('raw.customers_stream')
AS
MERGE INTO analytics.dim_customers tgt
USING (
SELECT customer_id, name, email, status, updated_at,
METADATA$ACTION AS action,
METADATA$ISUPDATE AS is_update
FROM raw.customers_stream
) src
ON tgt.customer_id = src.customer_id
WHEN MATCHED AND src.action = 'DELETE' AND src.is_update = FALSE
THEN DELETE
WHEN MATCHED AND src.action = 'INSERT' AND src.is_update = TRUE
THEN UPDATE SET
tgt.name = src.name,
tgt.email = src.email,
tgt.status = src.status,
tgt.updated_at = src.updated_at,
tgt.cdc_action = 'UPDATE',
tgt.cdc_timestamp = CURRENT_TIMESTAMP()
WHEN NOT MATCHED AND src.action = 'INSERT'
THEN INSERT (customer_id, name, email, status, updated_at, cdc_action)
VALUES (src.customer_id, src.name, src.email, src.status, src.updated_at, 'INSERT');
-- 5. Resume the task
ALTER TASK analytics.sync_customers_task RESUME;
The Stale Stream Problem
Streams have a retention window tied to the table's DATA_RETENTION_TIME_IN_DAYS setting. The default is 14 days for permanent tables. If you don't consume the Stream within that window, it goes stale - meaning Snowflake can no longer reconstruct the change delta because the historical micro-partitions have been garbage-collected.
A stale Stream can't be recovered. You'll need to drop it, recreate it, and do an initial full load of the target table. To avoid this:
- Set
DATA_RETENTION_TIME_IN_DAYSto at least 14 days (or longer for infrequently consumed Streams). - Monitor Stream staleness with
STALE_AFTERinSHOW STREAMS. - Set up alerts using Snowflake's notification integration when a Stream approaches its stale date.
Task Graphs: Multi-Step Pipelines
A single Task often isn't enough. You might need to stage data, apply business rules, and then load a final table. Task graphs (DAGs) let you chain Tasks with dependencies.
-- Root task: runs on schedule
CREATE OR REPLACE TASK stage_orders
WAREHOUSE = etl_wh
SCHEDULE = '10 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('raw.orders_stream')
AS
INSERT INTO staging.orders_staged
SELECT * FROM raw.orders_stream;
-- Child task: runs after stage_orders completes
CREATE OR REPLACE TASK transform_orders
WAREHOUSE = etl_wh
AFTER stage_orders
AS
INSERT INTO analytics.orders_clean
SELECT order_id, customer_id, amount,
CASE WHEN amount > 1000 THEN 'high_value' ELSE 'standard' END AS tier
FROM staging.orders_staged;
-- Grandchild task: runs after transform_orders
CREATE OR REPLACE TASK aggregate_orders
WAREHOUSE = etl_wh
AFTER transform_orders
AS
MERGE INTO analytics.daily_order_summary tgt
USING (
SELECT order_date, tier, SUM(amount) AS total, COUNT(*) AS cnt
FROM analytics.orders_clean
GROUP BY order_date, tier
) src ON tgt.order_date = src.order_date AND tgt.tier = src.tier
WHEN MATCHED THEN UPDATE SET tgt.total = src.total, tgt.cnt = src.cnt
WHEN NOT MATCHED THEN INSERT VALUES (src.order_date, src.tier, src.total, src.cnt);
-- Resume in reverse dependency order
ALTER TASK aggregate_orders RESUME;
ALTER TASK transform_orders RESUME;
ALTER TASK stage_orders RESUME;
Only the root Task has a SCHEDULE. Child Tasks fire automatically when their parent completes. You must resume child Tasks before the root, or Snowflake will skip them.
Error Handling and Monitoring
Tasks support an ERROR_INTEGRATION parameter for sending failure notifications, and you can configure SUSPEND_TASK_AFTER_NUM_FAILURES to auto-suspend a Task after repeated failures. The default setting for standalone tasks is 10.
Monitor Task execution with TASK_HISTORY:
SELECT name, state, scheduled_time, completed_time, error_message
FROM TABLE(information_schema.task_history(
task_name => 'PROCESS_ORDERS_TASK',
scheduled_time_range_start => DATEADD('hour', -24, CURRENT_TIMESTAMP())
))
ORDER BY scheduled_time DESC;
The Critical Gotcha: Transaction Control
Here's the mistake that catches almost every team on their first CDC implementation: consuming a Stream in a DML statement advances the Stream's offset, even if the surrounding transaction rolls back in certain autocommit scenarios. If your INSERT into the target table succeeds but a downstream step fails, the Stream offset has already moved forward. Those changes are gone from the Stream.
The fix: always use explicit transaction control when consuming Streams outside of Tasks. Within a Task, Snowflake wraps the execution in an implicit transaction, so the offset only advances if the Task's SQL succeeds. But if you're consuming Streams in stored procedures or multi-statement scripts, wrap everything in BEGIN ... COMMIT:
BEGIN; INSERT INTO analytics.target_table SELECT * FROM source_stream WHERE METADATA$ACTION = 'INSERT'; COMMIT;
Key Takeaways
- Streams track row-level changes (INSERT, UPDATE as DELETE+INSERT pairs, DELETE) using metadata columns.
- Tasks execute SQL on a schedule. Use
SYSTEM$STREAM_HAS_DATA()to avoid wasting warehouse credits on empty runs. - Task graphs let you chain multi-step pipelines. Only the root Task needs a SCHEDULE.
- Streams go stale after the table's data retention period expires. Monitor with
SHOW STREAMSand set adequate retention. - Always use explicit transaction control when consuming Streams in stored procedures to prevent offset advancement on failure.
- For simpler incremental pipelines, consider Dynamic Tables as an alternative to Streams + Tasks.
Frequently Asked Questions
Q: What is a Snowflake Stream?
A Snowflake Stream is a change tracking object on a table that records inserts, updates, and deletes. It exposes metadata columns (METADATA$ACTION, METADATA$ISUPDATE, METADATA$ROW_ID) that let you identify exactly what changed since the last time the Stream was consumed.
Q: What happens when a Snowflake Stream goes stale?
A Stream goes stale when its offset falls outside the table's data retention period (default 14 days). Once stale, the Stream can no longer provide change data and must be recreated. You lose the ability to capture changes that occurred during the gap.
Q: How do Snowflake Tasks differ from cron jobs?
Snowflake Tasks run SQL statements on a schedule (cron syntax or fixed intervals) entirely within Snowflake, using a specified warehouse. Unlike external cron jobs, Tasks support WHEN conditions to skip runs when there's no work, and they can form DAGs (task graphs) for multi-step pipelines.
Q: Does consuming a Snowflake Stream advance its offset even if the transaction fails?
Within a Task, the offset only advances on success. But in stored procedures or scripts, if the DML succeeds and autocommit is on, the offset advances even if downstream steps fail. Always wrap Stream consumption in explicit BEGIN/COMMIT blocks.
