Automated Schema Drift Management in Snowflake Using Cortex LLM

Automated Schema Drift Management in Snowflake Using Cortex LLM
Photo by JJ Ying on Unsplash

One of the biggest challenges I face when building data analytics platforms is managing changes in data structures from source systems to destination tables, which affects my ETL/ELT pipeline.

While I work with multiple platforms, recently most of my projects have been in Snowflake. In Snowflake, schema drift detection is easier, and there are several ways to implement it. Schema drift refers to changes in the structure of our database, such as adding, deleting, or modifying columns, data types, and constraints.

In this blog, I leveraged Snowflake Cortex LLM to automatically understand and propagate schema changes without any manual effort. I’ll explain with a simple example how I used Snowflake Cortex LLM to solve the schema drift challenge. I used the Claude-3.5-Sonnet model, though other models can work as well. Snowflake Cortex allows me to automatically analyze table structures and generate the appropriate DDL statements to propagate schema changes — essentially having AI write SQL for me based on the context of my data pipeline.

Prerequisites

This schema drift solution requires Snowflake Enterprise edition or higher with Cortex LLM availability in your region (for me it’s not available in Azure Australia East). Your role needs EXECUTE CORTEX MODEL, MONITOR USAGE, and task-related privileges. Be aware that LLM operations consume additional credits, so implement appropriate monitoring.

The Challenge

In modern data platforms, we typically organize data in layers:

  • Bronze layer: Raw data as it arrives
  • Silver layer: Cleaned and transformed data
  • Gold layer: Business-ready data with aggregations and derived metrics

When the structure of the Bronze layer changes (new columns, data types, etc.), those changes need to be intelligently propagated to Silver and Gold.

The key word is “intelligently” — we can’t just blindly copy columns. Each layer might transform data differently, and we need to preserve those transformations.

For simplicity in this blog, I’m using just one table called BRONZE which serves as our Bronze layer. Let’s assume the data gets updated every day. This data is then transformed in the SILVER table and aggregated into the GOLD table. Below is a diagram showing all three tables with their respective fields.

Image by Author

Methodology

Image by Author

Step 1: Creating Tables

Creating Bronze table

DROP TABLE IF EXISTS GOLD;
DROP TABLE IF EXISTS SILVER;
DROP TABLE IF EXISTS BRONZE;

-- Create BRONZE Table

CREATE OR REPLACE TABLE BRONZE (
    NAME STRING,
    SALES_ID NUMBER(38,0),
    TOTAL_SALES NUMBER(38,2),
    SALE_DATE DATE
);

I created SILVER and GOLD tables using dynamic tables which automatically refresh at set intervals, combining the query performance of materialized views with the freshness of regular views. The 1-minute refresh ensures near real-time data availability.

-- Create the new SILVER dynamic table

CREATE OR REPLACE DYNAMIC TABLE SILVER
WAREHOUSE = COMPUTE_WH
LAG = '1 MINUTE'
AS
SELECT
    name,
    sales_id,
    total_sales,
    sale_date,
    -- Simple date parts
    EXTRACT(YEAR FROM sale_date) AS year,
    EXTRACT(MONTH FROM sale_date) AS month,
    -- Basic derived fields
    CASE
        WHEN total_sales < 1000 THEN 'Small'
        WHEN total_sales < 5000 THEN 'Medium'
        ELSE 'Large'
    END AS sale_category
FROM bronze
WHERE sale_date IS NOT NULL;

-- Create the new GOLD dynamic table

CREATE OR REPLACE DYNAMIC TABLE GOLD
WAREHOUSE = COMPUTE_WH
LAG = '1 MINUTE'
AS
WITH customer_summary AS (
    SELECT
        name,
        COUNT(sales_id) AS total_transactions,
        SUM(total_sales) AS total_revenue,
        MIN(sale_date) AS first_purchase,
        MAX(sale_date) AS last_purchase
    FROM silver
    GROUP BY name
)
SELECT
    s.name,
    s.sales_id,
    s.total_sales,
    s.sale_date,
    s.year,
    s.month,
    s.sale_category,
    c.total_transactions,
    c.total_revenue,
    c.first_purchase,
    c.last_purchase,
    -- Simple customer categorization
    CASE
        WHEN DATEDIFF('day', c.last_purchase, CURRENT_DATE) > 90 THEN 'Inactive'
        ELSE 'Active'
    END AS customer_status
FROM silver s
JOIN customer_summary c ON s.name = c.name;

Inserting records into BRONZE table

-- Insert sample data into the BRONZE table
INSERT INTO BRONZE (name, sales_id, total_sales, sale_date)
VALUES
    ('Acme Corp', 1001, 2500.00, '2023-01-15'),
    ('Acme Corp', 1002, 3200.50, '2023-03-20'),
    ('Beta LLC', 1003, 750.25, '2023-02-05'),
    ('Gamma Inc', 1004, 5000.00, '2023-04-10'),
    ('Delta Co', 1005, 1200.00, '2023-01-28'),
    ('Epsilon Ltd', 1006, 8500.00, '2023-05-30');

Step 2: Monitoring Schema Drift

To monitor schema drift, I have created two tables: the baseline table stores the expected database structure by tracking each column's name and data type across all tables. The second table, log, captures any changes to the schema over time, recording when columns are added, their data types, and whether these changes have been propagated to downstream tables.

--- Create baseline table
CREATE OR REPLACE TABLE baseline (  
    table_name STRING,
    column_name STRING,
    data_type STRING
);

--- Create log table
CREATE OR REPLACE TABLE log (
    change_detected_at TIMESTAMP,
    table_name STRING,
    column_name STRING,
    data_type STRING,
    change_type STRING,
    propagated BOOLEAN DEFAULT FALSE
);

The propagated field in the log table serves as a status indicator. It starts as FALSE by default because when a schema change is first detected, the change hasn't yet been propagated (or applied) to downstream tables.

The purpose of this field is to:

  1. Track which schema changes have been addressed in your data pipeline
  2. Allow for automated or manual processes to update this flag to TRUE once the change has been properly handled

It remains FALSE initially because the task only detects changes but doesn't automatically modify your SILVER or GOLD tables to include new columns. You would need to either manually update those table definitions or create a separate automation process that would:

  1. Identify unpropagated changes
  2. Alter the downstream tables to include the new columns
  3. Set the propagated flag to TRUE once complete

This gives you control over which changes you want to propagate through your data pipeline rather than automatically changing all downstream objects.

Next, I created monitor task runs every minute to automatically detect new columns by comparing the current database structure against the baseline. When it finds differences, it logs them as changes and updates the baseline accordingly.

— Create monitor task
CREATE OR REPLACE TASK monitor
WAREHOUSE = COMPUTE_WH
SCHEDULE = 'USING CRON * * * * * UTC'  -- Runs every minute
AS
BEGIN
    INSERT INTO log (change_detected_at, table_name, column_name, data_type, change_type)
    SELECT 
        CURRENT_TIMESTAMP AS change_detected_at,
        table_name,
        column_name,
        data_type,
        'ADDED' AS change_type
    FROM INFORMATION_SCHEMA.COLUMNS
    WHERE table_schema = 'DBO'
      AND table_catalog = 'SCHEMA_DRIFT_POC'
      AND column_name NOT IN (SELECT column_name FROM baseline)
      AND table_name != 'LOG';
 

    INSERT INTO baseline (table_name, column_name, data_type)
    SELECT table_name, column_name, data_type
    FROM INFORMATION_SCHEMA.COLUMNS
    WHERE table_schema = 'DBO'
      AND table_catalog = 'SCHEMA_DRIFT_POC'
      AND (table_name, column_name) NOT IN (SELECT table_name, column_name FROM baseline);

END;

-- Resuming the task
ALTER TASK monitor RESUME;

Next step is to create a drift_log table which acts as an audit trail for schema update activities. It captures detailed records of each attempt to apply schema changes from source tables to target tables, including timestamps, affected columns, and outcomes.

This table stores information such as when changes were attempted, which tables were involved, what specific column modifications were made, and whether the updates succeeded or failed. For unsuccessful attempts, it preserves error messages to facilitate troubleshooting.

By maintaining this drift_log table, we gain visibility into the automated schema change process, enabling us to monitor schema change activities, verify successful updates, and quickly identify and resolve any issues that arise during updates to downstream tables.

-- Create drift_log table
CREATE OR REPLACE TABLE drift_log (
 log_id NUMBER AUTOINCREMENT,
 changed_at TIMESTAMP_NTZ,
 source_table STRING,
 target_table STRING,
 column_name STRING,
 change_type STRING,
 status STRING,
 error_message STRING,
 PRIMARY KEY (log_id)
);

Step 3: Usage of LLM

Intelligent Propagation Procedure:

CREATE OR REPLACE PROCEDURE SCHEMA_DRIFT_POC.DBO.PROPAGATE_SCHEMA_DRIFT()
RETURNS VARCHAR
LANGUAGE JAVASCRIPT
EXECUTE AS OWNER
AS '
// Main procedure
function main() {
    // Step 1: Get recent schema changes that need to be propagated
    var schemaChangesQuery = "SELECT scl.table_name, scl.column_name, scl.data_type, scl.change_type, scl.change_detected_at " + 
                            "FROM log scl " + 
                            "WHERE scl.propagated = FALSE " + 
                            "ORDER BY scl.change_detected_at DESC";
    
    try {
        // Get schema changes
        var schemaChanges = snowflake.execute({sqlText: schemaChangesQuery});
        
        // If no changes to propagate, exit
        if (!schemaChanges.next()) {
            return "No schema changes to propagate";
        }
        
        // Step 2: Process each changed table
        do {
            var changedTable = schemaChanges.getColumnValue(1);
            var columnName = schemaChanges.getColumnValue(2);
            var dataType = schemaChanges.getColumnValue(3);
            var changeType = schemaChanges.getColumnValue(4);
            var changeTime = schemaChanges.getColumnValue(5);
            
            // Convert JavaScript Date object to Snowflake timestamp string format
            // This prevents timestamp formatting issues
            var formattedChangeTime = changeTime;
            if (changeTime instanceof Date) {
                formattedChangeTime = changeTime.toISOString().replace(''T'', '' '').replace(''Z'', '''');
            }
            
            // Step 3: Get downstream lineage for the changed table
            var lineageQuery = "SELECT DISTANCE, SOURCE_OBJECT_NAME, TARGET_OBJECT_NAME, TARGET_OBJECT_DOMAIN " + 
                              "FROM TABLE (SNOWFLAKE.CORE.GET_LINEAGE(''SCHEMA_DRIFT_POC.DBO." + changedTable + "'', ''TABLE'', ''DOWNSTREAM'', 10)) " + 
                              "WHERE SOURCE_STATUS = ''ACTIVE'' AND TARGET_STATUS = ''ACTIVE'' " + 
                              "ORDER BY DISTANCE";
            
            var lineageResult = snowflake.execute({sqlText: lineageQuery});
            
            // Step 4: For each downstream table, generate and apply DDL changes
            while (lineageResult.next()) {
                var sourceTable = lineageResult.getColumnValue(2);
                var targetTable = lineageResult.getColumnValue(3);
                var targetDomain = lineageResult.getColumnValue(4);
                
                // Get existing DDL for the target table
                var ddlQuery = "SELECT GET_DDL(''" + targetDomain + "'', ''" + targetTable + "'') AS EXISTING_DDL";
                var ddlResult = snowflake.execute({sqlText: ddlQuery});
                
                if (ddlResult.next()) {
                    var existingDDL = ddlResult.getColumnValue(1);
                    
                    // Format schema change info for LLM prompt
                    var schemaChangeInfo = "Table: " + changedTable + ", Column: " + columnName + ", Type: " + dataType + ", Change: " + changeType;
                    
                    // Create LLM prompt
                    var llmPrompt = "This is the existing DDL for the target table ''" + targetTable + "'':\\n\\n" +
                        existingDDL + "\\n\\n" +
                        "Based on the schema changes detected in the upstream table ''" + sourceTable + "'', shown below:\\n\\n" +
                        schemaChangeInfo + "\\n\\n" +
                        "Make the necessary modifications to the DDL for ''" + targetTable + "'' to incorporate these changes from ''" + sourceTable + "''.\\n" +
                        "Ensure that:\\n" +
                        "1. The structure and formatting of the original DDL is preserved, including any WITH clauses, transformations, or filters.\\n" +
                        "2. The newly added or modified columns are integrated into the DDL appropriately, reflecting only the specified changes.\\n" +
                        "3. Only return the SQL query with the updated structure in plain text—do not include explanations, comments, or markdown.";
                    
                    // Call Snowflake Cortex LLM with properly escaped prompt
                    var escapedPrompt = llmPrompt.replace(/''/g, "''''");
                    var cortexSql = "SELECT snowflake.cortex.complete(''claude-3-5-sonnet'', ''" + escapedPrompt + "'')";
                    var llmResponse = snowflake.execute({sqlText: cortexSql});
                    
                    if (llmResponse.next()) {
                        var newDDL = llmResponse.getColumnValue(1);
                        
                        // Clean up LLM response (remove markdown code blocks if present)
                        newDDL = newDDL.replace(/```sql|```/g, '''').trim();
                        
                        // Apply the new DDL
                        try {
                            snowflake.execute({sqlText: newDDL});
                            
                            // Log the successful update
                            var logQuery = "INSERT INTO drift_log (changed_at, source_table, target_table, column_name, change_type, status) " +
                                          "VALUES (CURRENT_TIMESTAMP, ''" + sourceTable + "'', ''" + targetTable + "'', ''" + 
                                          columnName + "'', ''" + changeType + "'', ''SUCCESS'')";
                            snowflake.execute({sqlText: logQuery});
                        } catch (err) {
                            // Log the failed update
                            var errorMessage = err.toString().replace(/''/g, "''''");
                            var errorLogQuery = "INSERT INTO drift_log (changed_at, source_table, target_table, column_name, change_type, status, error_message) " +
                                               "VALUES (CURRENT_TIMESTAMP, ''" + sourceTable + "'', ''" + targetTable + "'', ''" + 
                                               columnName + "'', ''" + changeType + "'', ''ERROR'', ''" + errorMessage + "'')";
                            snowflake.execute({sqlText: errorLogQuery});
                        }
                    }
                }
            }
            
           
            var updateQuery = "UPDATE log " +
                              "SET propagated = TRUE " +
                              "WHERE table_name = ''" + changedTable + "'' " +
                              "AND column_name = ''" + columnName + "'' " +
                              "AND propagated = FALSE";
            
            snowflake.execute({sqlText: updateQuery});
            
        } while (schemaChanges.next());
        
        return "Schema drift propagation completed successfully";
    } catch (err) {
        return "Error: " + err;
    }
}

// Execute the main procedure
return main();
';
  • First, it constructs a SQL query to identify schema changes that haven’t yet been propagated through the data pipeline. It targets records in the log table where the propagated = FALSE, ordering them by their detection time in descending order so that the most recent changes are processed first.
  • Second, for each unpropagated schema change found, this code extracts the essential details: which table was changed, what column was added or modified, the data type of that column, the type of change that occurred, and when the change was detected. This information is retrieved from the result set and stored in variables for later use in the propagation process.
  • Third, one of the coolest parts! The procedure uses Snowflake’s lineage features to map out which tables are downstream from our changed table. This shows the data flow path through our pipeline, so we know exactly which tables need updating.
  • Fourth, for each table that needs updating, we first get its current definition (DDL) using Snowflake’s GET_DDL function. This gives us the complete SQL that defines the table structure.
  • Fifth, this is where the magic happens! We use Snowflake Cortex LLM (claude-3–5-sonnet) to intelligently update the table definition. We send the AI: the current table definition, information about the schema change, and specific instructions (prompt) on how to modify the SQL. Below is the prompt I used in this example
This is the existing DDL for the target table '{targetTable}':

{existingDDL}

Based on the schema changes detected in the upstream table '{sourceTable}', shown below:

{schemaChangeInfo}

Make the necessary modifications to the DDL for '{targetTable}' to incorporate these changes from '{sourceTable}'.
Ensure that:
1. The structure and formatting of the original DDL is preserved, including any WITH clauses, transformations, or filters.
2. The newly added or modified columns are integrated into the DDL appropriately, reflecting only the specified changes.
3. Only return the SQL query with the updated structure in plain text—do not include explanations, comments, or markdown.
  • Sixth, we execute the AI-generated SQL to update the table definition. We then log the outcome (success or failure) in our drift_log table for audit purposes.
  • Finally, we mark the change as applied (propagated = TRUE) in our log table, so we don't process it again in future runs.

The above stored procedure helps us automatically detect schema changes in our data pipeline. When a change is detected, it uses Snowflake Cortex AI to intelligently generate the necessary DDL statements and update all downstream tables (SILVER & GOLD). This ensures that schema changes propagate seamlessly throughout the entire data pipeline without requiring manual intervention.

Final step is to schedule the stored procedure to run once daily by creating a task. It runs on a 24-hour schedule to apply schema changes throughout the data pipeline.

CREATE OR REPLACE TASK schema_drift_propagator
WAREHOUSE = COMPUTE_WH
SCHEDULE = 'USING CRON 0 0 * * * UTC' 
AS
CALL propagate_schema_drift();

Testing the Solution

I tested by adding new columns (description, region) to the bronze table and inserting values into them.

-- Add the new columns to the bronze table
ALTER TABLE bronze ADD COLUMN DESCRIPTION STRING;
ALTER TABLE bronze ADD COLUMN REGION STRING;

-- Insert a row with values for all columns including the new ones
INSERT INTO bronze (Name, Sales_ID, Total_sales, SALE_DATE, DESCRIPTION, REGION) 
VALUES ('Victory', 25, 67.42, '2025-03-25', 'WELCOME TO AFRICA', 'INDIA');
Image by Author
Image by Author

The above is a snapshot of the tables. We can see that DESCRIPTION and REGION columns were added to the SILVER & GOLD tables, and these changes are also reflected in both the LOG and Drift_Log tables.

I tested the solution with multiple fields by adding and removing columns, and it’s working reliably. The procedure typically takes 5–7 minutes to execute, though sometimes it completes faster. During implementation, I experimented with different prompts to optimize the LLM’s understanding of schema changes. One important consideration is monitoring the credit usage associated with these LLM calls in a production environment.

Initially, I was planning to create my own AI agent for handling schema drift. However, after discovering Snowflake’s tutorial “An ETL-LLM Integration Guide: Automate and Ensure Consistent Schema Change Propagation using AI” I decided to adapt their approach using stored procedures, which aligns with my preferred method for handling schema drift even without LLM integration.

This solution combines the best of both worlds — the reliability of traditional stored procedure-based automation with the intelligence of AI-generated DDL modifications. The result is a robust, self-healing data pipeline that requires minimal maintenance while adapting to upstream schema changes automatically.

Thanks for reading.

Feel free to connect with me on LinkedIn!

Reference

http://quickstarts.snowflake.com/guide/schema_lineage_auto_propagation_llm/index.html?index=..%2F..index#5