Building a Fully Snowflake-Native ELT Pipeline with Stripe and Python

Building a Fully Snowflake-Native ELT Pipeline with Stripe and Python
Image by Author

I recently worked on a project to integrate Stripe with Snowflake for downstream reporting. Although our primary tech stack is built on Azure and typically uses Data Factory for data movement, I wanted to explore using Snowflake’s External Access Integration to connect directly to Stripe.

Instead of relying on external ETL tools, I used Snowflake Notebooks to build a fully Snowflake-native pipeline. I followed the ELT (Extract, Load, Transform) pattern and implemented a Medallion Architecture — loading raw data into Bronze, transforming it into a structured Silver layer, and aggregating insights in the Gold layer. This data is primarily used by business teams for reporting in Power BI.

Since we’ve been using Stripe since 2021, the business requested historical data from that point onward. I first extracted historical data from 2021 to today into the Bronze table (in raw JSON format), then transformed and loaded it into the Silver table (Dynamic), and finally created two curated Gold tables (Dynamic). For this blog, I demonstrate loading the last 180 days only.

Additionally, I built a log table to track each run of the pipeline, recording the number of records extracted and the run status. I also integrated Slack notifications, so our team receives alerts for both successful and failed pipeline runs — all without using any external ETL tools.

Image by Author

In this blog, I’ll walk you through the architecture, how I built each layer, and the limitations I encountered when integrating Stripe directly into Snowflake using only native features.


Step 1: Setup in Snowflake 

Creating database called ‘STRIPE_ELT’, Schema ‘DBO’ and Warehouse ‘STRIPE’

-- Create the databases and schemas 

CREATE OR REPLACE DATABASE STRIPE_ELT;
CREATE OR REPLACE SCHEMA DBO;
CREATE OR REPLACE WAREHOUSE STRIPE;

Second, create Bronze table where I store the raw JSON responses from the Stripe API.

CREATE OR REPLACE TABLE DBO.BRONZE.STRIPE_RAW_TRANSACTIONS (
  DATA VARIANT,
  EXTRACTION_DATE TIMESTAMP_LTZ,
  START_DATE DATE,
  END_DATE DATE
);
  • The DATA column is of type VARIANT — ideal for storing semi-structured JSON from Stripe.
  • EXTRACTION_DATE captures when the data was fetched.
  • START_DATE and END_DATE define the time range covered by the batch, useful for incremental loads and logging.

This table serves as the first layer in the Medallion architecture — preserving raw data for traceability, reprocessing, or future schema changes.

Third, To connect Snowflake directly with external services like Stripe and Slack, we need to define Network Rules and create corresponding External Access Integrations.

We need to create network rule specifies which public internet domains Snowflake is allowed to communicate with (e.g., api.stripe.com or hooks.slack.com), and the external access integration enables secure, outbound access from Snowflake to those endpoints. 

Below are the SQL statements to create the required access configurations. Remember to activate the appropriate integration in your Snowflake Notebook settings. 

-- Network Rule and External Integration for Stripe
CREATE OR REPLACE NETWORK RULE stripe_api_rule
  TYPE = HOST_PORT
  MODE = EGRESS
  VALUE_LIST = ('api.stripe.com:443');

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION stripe_ext_access_integration
  ALLOWED_NETWORK_RULES = (stripe_api_rule)
  ENABLED = TRUE;

-- Network Rule and External Integration for Slack

CREATE OR REPLACE NETWORK RULE slack_webhook_rule
  TYPE = HOST_PORT
  MODE = EGRESS
  VALUE_LIST = ('hooks.slack.com:443');

  CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION slack_ext_access_integration
  ALLOWED_NETWORK_RULES = (slack_webhook_rule)
  ENABLED = TRUE;

Fourth, we need to create extraction log_table which is used to log each execution of the Stripe extraction process. It helps with monitoring, auditability, and debugging.

CREATE OR REPLACE TABLE stripe_elt.dbo.extraction_log (
  batch_start_date DATE,
  batch_end_date DATE,
  extraction_date TIMESTAMP_LTZ DEFAULT CURRENT_TIMESTAMP,
  record_count INTEGER,
  status STRING,
  message STRING
);
  • batch_start_date and batch_end_date: Time window for which data was pulled from Stripe
  • extraction_date: Timestamp when the script was run (automatically populated)
  • record_count: Number of records successfully extracted
  • status: Indicates whether the load was a "Success" or "Failed"
  • message: Optional notes, including error messages or success context (e.g., "Loaded into Bronze")

This log is written to every time the pipeline runs, making it easy to build monitoring dashboards or trigger alerts when something fails.


Step 2: Python Script in Snowflake Notebook (Bronze Layer)

The below python script runs inside a Snowflake Notebook and extracts raw data from the Stripe API. It follows the Extract + Load portion of the ELT pipeline and writes raw JSON data into a VARIANT column in the Bronze layer.

from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
from snowflake.snowpark.types import StructType, StructField, VariantType, StringType
from datetime import datetime, timedelta
from tqdm import tqdm
import stripe
import time

# Stripe API Key 
stripe.api_key = "xxxxxxxxxxxx"

# Create Snowpark session
session = Session.builder.getOrCreate()

# Define date range (last 6 months)
end_date = datetime.utcnow().date()
start_date = end_date - timedelta(days=180)

# Process in 7-day chunks
chunk_days = 7
chunk_ranges = []
chunk_start = start_date

while chunk_start < end_date:
    chunk_end = min(chunk_start + timedelta(days=chunk_days), end_date)
    chunk_ranges.append((chunk_start, chunk_end))
    chunk_start = chunk_end

total_records = 0

# Process with tqdm progress bar
for chunk_start, chunk_end in tqdm(chunk_ranges, desc="Loading Stripe Data"):
    print(f"\n Extracting from {chunk_start} to {chunk_end}...")

    start_ts = int(datetime.combine(chunk_start, datetime.min.time()).timestamp())
    end_ts = int(datetime.combine(chunk_end, datetime.min.time()).timestamp())

    try:
        charges = []
        params = {
            "created": {"gte": start_ts, "lt": end_ts},
            "limit": 100,
            "expand": ["data.customer"]
        }

        for charge in stripe.Charge.list(**params).auto_paging_iter():
            charges.append({
                "data": charge.to_dict(),  # Correct VARIANT-ready object
                "extraction_date": datetime.utcnow().isoformat(),
                "start_date": chunk_start.isoformat(),
                "end_date": chunk_end.isoformat()
            })
            time.sleep(0.05)

        # Define schema for Snowflake Bronze table
        schema = StructType([
            StructField("data", VariantType()),
            StructField("extraction_date", StringType()),
            StructField("start_date", StringType()),
            StructField("end_date", StringType())
        ])

        # Write to Bronze table
        if charges:
            df = session.create_dataframe(charges, schema=schema)
            df.write.mode("append").save_as_table("stripe_elt.dbo.stripe_raw_transactions")
            print(f"Loaded {len(charges)} rows to Bronze")

        # Log to metadata table
        session.sql(f"""
            INSERT INTO stripe_elt.dbo.extraction_log (
              batch_start_date, batch_end_date, record_count, status, message
            ) VALUES (
              '{chunk_start}', '{chunk_end}', {len(charges)},
              'Success', 'Loaded into Bronze'
            )
        """).collect()

        total_records += len(charges)

    except Exception as e:
        print(f"Error from {chunk_start} to {chunk_end}: {str(e)}")

        session.sql(f"""
            INSERT INTO stripe_elt.metadata.extraction_log (
              batch_start_date, batch_end_date, record_count, status, message
            ) VALUES (
              '{chunk_start}', '{chunk_end}', 0,
              'Failed', '{str(e)[:200]}'
            )
        """).collect()

print(f"\n🎉 Historical load complete! Total records inserted: {total_records}")

The above script follows the Extract + Load pattern of the ELT process, where raw data is captured in JSON format and stored in a VARIANT column. It is broken into the following key steps:

  • Session Setup: A Snowpark session is initialized to interact with Snowflake, and the Stripe API is authenticated using a restricted key.
  • Date Range Definition: The script defines a rolling 180-day period and processes the data in 7-day chunks to manage rate limits and improve performance.
  • Chunk Loop: For each 7-day window, it calls the Stripe API, retrieves charges, and stores the full JSON response.
  • Data Loading: The results are written into the stripe_elt.dbo.stripe_raw_transactions table, with VARIANT type used for the raw JSON, and metadata columns for extraction time and date range.
  • Logging: Each run is logged into the stripe_elt.dbo.extraction_log table, capturing the batch start/end dates, number of records extracted, and success or failure status.
  • Progress Tracking: A progress bar (tqdm) is used to show the status of each chunk in real-time.

This script forms the foundation of the pipeline, populating the Bronze layer from which the Silver and Gold layers are derived through transformations using Dynamic Tables.

Below is the sample data from the API stored in the bronze table

Image by Author

Step 3: Transform to Silver Layer

Now that the raw data is loaded into the Bronze layer, the next step is to transform it into a structured format using a Dynamic Table in the Silver layer.

The following SQL creates a dynamic table called stripe_charges_silver. It processes the raw JSON data stored in the Bronze table and extracts structured fields from the VARIANT column.

CREATE OR REPLACE DYNAMIC TABLE stripe_elt.dbo.stripe_charges_silver
  TARGET_LAG = '1 HOUR'
  WAREHOUSE = STRIPE
AS
SELECT
  -- Core Charge Info
  data:id::STRING AS charge_id,
  data:created::NUMBER AS created_unix,
  TO_TIMESTAMP(data:created) AS created_at,
  data:amount::NUMBER / 100 AS amount_usd,
  data:amount_captured::NUMBER / 100 AS amount_captured,
  data:amount_refunded::NUMBER / 100 AS amount_refunded,
  data:currency::STRING AS currency,
  data:status::STRING AS charge_status,
  data:description::STRING AS description,

  -- Customer Info
  data:customer:id::STRING AS customer_id,
  data:customer:email::STRING AS customer_email,
  data:customer:description::STRING AS customer_description,

  -- Billing Details
  data:billing_details:name::STRING AS billing_name,
  data:billing_details:email::STRING AS billing_email,
  data:billing_details:address:city::STRING AS billing_city,
  data:billing_details:address:state::STRING AS billing_state,
  data:billing_details:address:postal_code::STRING AS billing_postal_code,
  data:billing_details:address:country::STRING AS billing_country,

  -- Outcome / Risk
  data:outcome:type::STRING AS outcome_type,
  data:outcome:risk_level::STRING AS outcome_risk_level,
  data:outcome:seller_message::STRING AS outcome_message,

  -- Payment Info
  data:payment_method::STRING AS payment_method_id,
  data:payment_method_details:type::STRING AS payment_type,
  data:payment_method_details:card:brand::STRING AS card_brand,
  data:payment_method_details:card:exp_month::NUMBER AS card_exp_month,
  data:payment_method_details:card:exp_year::NUMBER AS card_exp_year,
  data:payment_method_details:card:funding::STRING AS card_funding,
  data:payment_method_details:card:last4::STRING AS card_last4,

  -- Receipt & Metadata
  data:receipt_email::STRING AS receipt_email,
  data:receipt_number::STRING AS receipt_number,
  data:receipt_url::STRING AS receipt_url,

  -- Metadata
  extraction_date,
  start_date,
  end_date

FROM stripe_elt.dbo.stripe_raw_transactions;
  • The dynamic table automatically refreshes every 1 hour using TARGET_LAG = '1 hour'.
  • It selects and flattens important attributes from the raw Stripe charge data, such as charge amounts, customer details, billing information, and payment metadata.
  • It also retains metadata from the original load (extraction date and date range) for traceability.

This structured Silver layer acts as the foundation for business reporting, allowing users to query clean, analytics-ready data without needing to interpret raw JSON.

Once created, this dynamic table will continue to auto-refresh whenever new data is inserted into the Bronze table.

Step 4: Aggregate into Gold Layer for Power BI

The first Gold table, daily_revenue_summary, aggregates key metrics by charge date, such as total revenue, success rate, and unique customers. The second table, revenue_by_card_and_country, breaks down performance by card brand and billing country. Both tables refresh automatically using TARGET_LAG = '1 hour' and are connected to Power BI for reporting.

-- Daily Revenue Summary
CREATE OR REPLACE DYNAMIC TABLE stripe_elt.gold.daily_revenue_summary
  TARGET_LAG = '1 hour'
  WAREHOUSE = STRIPE  
AS
SELECT
  DATE_TRUNC('DAY', created_at) AS charge_date,
  COUNT(*) AS total_charges,
  SUM(amount_usd) AS total_revenue,
  COUNT_IF(charge_status = 'succeeded') AS successful_charges,
  COUNT_IF(charge_status != 'succeeded') AS failed_charges,
  COUNT(DISTINCT billing_email) AS unique_customers,
  MAX(created_at) AS last_payment_time
FROM stripe_elt.silver.stripe_charges_silver
GROUP BY 1;
-- Revenue by Card and Country
CREATE OR REPLACE DYNAMIC TABLE stripe_elt.gold.revenue_by_card_and_country
  TARGET_LAG = '1 hour'
  WAREHOUSE = TEST  -- replace with your warehouse
AS
SELECT
  COALESCE(card_brand, 'UNKNOWN') AS card_brand,
  COALESCE(billing_country, 'UNKNOWN') AS billing_country,
  COUNT(*) AS total_charges,
  COUNT_IF(charge_status = 'succeeded') AS successful_charges,
  ROUND(SUM(amount_usd), 2) AS total_revenue,
  ROUND(AVG(amount_usd), 2) AS avg_transaction_amount,
  ROUND(100.0 * COUNT_IF(charge_status = 'succeeded') / COUNT(*), 2) AS success_rate_pct,
  MAX(created_at) AS last_payment_time
FROM stripe_elt.silver.stripe_charges_silver
GROUP BY card_brand, billing_country;

The above two tables connected to Power BI for the reporting


Step 5: Enable Slack Notification

To receive alerts on the status of the Stripe data load, I added a simple Slack integration using Python’s requests library. 

import requests
import json

def send_slack_notification(message, webhook_url):
    payload = {
        "text": message
    }
    headers = {'Content-Type': 'application/json'}
    response = requests.post(webhook_url, headers=headers, data=json.dumps(payload))

    if response.status_code == 200:
        print(" Slack notification sent.")
    else:
        print(f" Slack failed: {response.status_code}, {response.text}")

# Replace with your actual webhook
slack_url = "https://hooks.slack.com/services/xxxx/xxx/xxxx"

# Reuse variables from Stripe extraction cell (must be defined there)
msg = f"""
*Stripe Load Completed*
Window: `{start_time}` → `{end_time}`
Records Loaded: *{len(charges)}*
"""

# Send notification
send_slack_notification(msg, slack_url)

After the data extraction is complete, the above script sends a formatted message to a predefined Slack channel via webhook. The message includes the time window of the load and the number of records processed. This is especially useful for monitoring the pipeline without checking the logs manually. If the Slack API responds with a 200 status, the notification is considered successfully sent; otherwise, the error details are printed. This lightweight approach enables real-time feedback on pipeline execution directly in Slack.


After the historical load, we scheduled the Snowflake Notebook to run every hour. It extracts Stripe data for the past 1-hour window and appends it to the Bronze table. Since the Silver and Gold layers are built using Dynamic Tables, they auto-refresh based on the new data. A Slack alert is triggered after every run to notify the team.

Script for Incremental load

from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
from snowflake.snowpark.types import StructType, StructField, VariantType, StringType
from datetime import datetime, timedelta
import stripe
import time

# Stripe API Key (use restricted key in production)
stripe.api_key = "xxxxxxxxxxxxx"

# Create Snowpark session
session = Session.builder.getOrCreate()

# ⏱ Define 1-hour rolling extraction window
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=1)

# Convert to UNIX timestamps for Stripe API
start_ts = int(start_time.timestamp())
end_ts = int(end_time.timestamp())

# ⚙ Stripe API request parameters
params = {
    "created": {"gte": start_ts, "lt": end_ts},
    "limit": 100,
    "expand": ["data.customer"]
}

# Extract charges from Stripe
charges = []
try:
    for charge in stripe.Charge.list(**params).auto_paging_iter():
        charges.append({
            "data": charge.to_dict(),
            "extraction_date": datetime.utcnow().isoformat(),
            "start_date": start_time.isoformat(),
            "end_date": end_time.isoformat()
        })
        time.sleep(0.05)  # Slight delay to avoid rate limiting

    # Define schema for Bronze table
    schema = StructType([
        StructField("data", VariantType()),
        StructField("extraction_date", StringType()),
        StructField("start_date", StringType()),
        StructField("end_date", StringType())
    ])

    # Load into Bronze table
    if charges:
        df = session.create_dataframe(charges, schema=schema)
        df.write.mode("append").save_as_table("stripe_elt.dbo.stripe_raw_transactions")
        print(f" Loaded {len(charges)} rows to Bronze")

    # Log to extraction log table
    session.sql(f"""
        INSERT INTO stripe_elt.dbo.extraction_log (
          batch_start_date, batch_end_date, record_count, status, message
        ) VALUES (
          '{start_time}', '{end_time}', {len(charges)},
          'Success', 'Loaded 1-hour batch to Bronze'
        )
    """).collect()

except Exception as e:
    print(f" Error: {str(e)}")

    session.sql(f"""
        INSERT INTO stripe_elt.dbo.extraction_log (
          batch_start_date, batch_end_date, record_count, status, message
        ) VALUES (
          '{start_time}', '{end_time}', 0,
          'Failed', '{str(e)[:200]}'
        )
    """).collect()

This pipeline works well for light-to-moderate data sources where the volume of data and complexity of transformation is manageable. By using Snowflake Notebooks, external access integrations, and dynamic tables, the entire pipeline runs natively in Snowflake without relying on any external ETL tools. This approach is lightweight, flexible, and easy to deploy — making it ideal for modern, serverless data engineering tasks.

However, there are a few important considerations:

  • Historical Loads Can Be Time-Consuming: When extracting large volumes (e.g., all data since 2021), the notebook can take 30+ minutes to complete due to API throttling and backoff logic added to stay within Stripe rate limits.
  • Warehouse Cost Needs Monitoring: Since the Python script uses a Snowflake warehouse for execution and writing, it’s important to size the warehouse correctly and monitor compute usage. Long-running notebooks (like historical loads) can incur noticeable costs if not optimized.
  • Not Ideal for Complex Systems: For high-volume sources like Oracle NetSuite, Salesforce, or SAP, a dedicated ETL/ELT tool is preferred. These platforms often require CDC logic, retries, schema management, and operational monitoring that are difficult to replicate natively.

Thanks for reading.

Feel free to connect with me on LinkedIn!