Introduction¶

An efficient way to migrate Snowflake data to Databricks is to export the Snowflake tables to Parquet then import them into Databricks -- using a Cloud storage container to hold the Parquet intermediary objects in the same Cloud region as the Snowflake and Databricks tenants (for speed and egress charges). This is substantially faster/cheaper than using a typical database-to-database SQL connection setup and is consistent with the method described in the Databricks AI Summit session on Snowflake migration practices.

Keep in mind this method serves the purpose of transferring data that is no longer changing. This could be data that is historical, or a point-in-time migration of data that will then be wired up with ongoing processing logic.

The data validation steps are minimal and need expanding in a "real world" setting: we get a row count from the export and import process which we visually compare, and a we visual compare the first 10 rows from the Snowflake source and Databricks destination table.

SQL provides a COPY INTO expression that performs the heavy lifting of exporting and then importing the Parquet objects. The "legacy" process within Databricks for accessing external objects is to mount the cloud store under DBFS (Databricks File System) and then reference the object using the DBFS path name. This has been deprecated, and the preferred method is to use a URI along with an access credential to reach the object. The documentation for making this work using Azure ADLS is lacking, with the material and examples based on AWS S3. I had to conduct experimentation to "back into" the correct methods detailed in this notebook.

The access credentials for interfacing with Snowflake and the Cloud storage are hidden within Databricks Secrets as a best practice.

Kurt Rosenfeld kurtr@ctidata.com

No description has been provided for this image


Note: This notebook was developed and runs within the native Databricks Notebook. It likely can be made to work within other IDEs (eg. Visual Studio or native Jupyter) by installing the Databricks IDE package into your desktop environment.

How it works¶

The notebook migrates a Snowflake table, called tableName, to a Databricks Unity Catalog table having the same name.

Snowflake employs a 3 level namespace of DATABASE.SCHEMA.TABLE and the tables we want to export reside in patient.silver

Databricks Unity Catalog also employs a 3 level namespace of CATALOG.SCHEMA.TABLE. We create a new Unity Catalog called patient_import.silver into which the tables will be imported with the same name.

For example:

Snowflake table Databricks table Parquet Objects
patient.silver.admissions patient_import.silver.admissions admissons/admissions_1.snappy.parquet
admissons/admissions_2.snappy.parquet

Export process¶

Each table is exported from snowflake as a collection of Snappy compressed Parquet objects, each about 20Mb, labelled:

tableName_< increment count >.snappy.parquet

The Parquet objects are stored in an ADLS blob container, each collection stored under a folder called tableName within the container.

The generation of this per-table collection structure is automatic to the methods used in the code.

Import process¶

The import process takes each folder collection and automatically ingests all the Parquet objects reconstituting the table within Databricks.

Databricks needs a hierarchical namespace, similar to HDFS:

  • The Azure WASBS interface overlays a hierarchy "emulation" on Azure blob storage by incorporating "/" in the blob names, similar to the way AWS S3 works.
  • Azure ABFSS uses the more efficient hierarchy features built in to ADLS Gen2, which are available if the storage container is set to "hierarchy enabled"
  • Since WASBS will work either way, we are using it for simplicity. However we also ran the code using the ABFSS access method, which we show commented out.

Prerequisites¶

  • An ADLS Storage Account provisioned with a Storage Container and a generated SAS token saved in Databricks Secrets.
  • Access credentials to the Snowflake data, saved in Databricks Secrets.

Packages & technical doc references¶

In [0]:
%pip install --upgrade snowflake-connector-python
%pip install --upgrade azure-storage-blob

"""
  https://docs.snowflake.com/en/sql-reference/sql/copy-into-location
  https://docs.databricks.com/en/sql/language-manual/delta-copy-into.html
  https://docs.databricks.com/en/ingestion/cloud-object-storage/copy-into/temporary-credentials.html
  https://docs.databricks.com/en/sql/language-manual/sql-ref-datatype-rules.html
"""
In [0]:
%restart_python

Initialize platform connections¶

Credential secrets¶

In [0]:
secretStore = "dbmigrate"

print(f"""
-- Use the databricks CLI to create the secrets. 
-- Run these commands from the databricks cluster web terminal

databricks secrets create-scope {secretStore}
databricks secrets list-scopes

databricks secrets put-secret {secretStore} ADLS_ACCOUNT --string-value "<YOUR ADLS ACCOUNT NAME>"
databricks secrets put-secret {secretStore} ADLS_SAS_TOKEN --string-value "<YOUR ADLS SAS TOKEN>"

databricks secrets put-secret {secretStore} SNOW_USER --string-value "<YOUR SNOWFLAKE USER ACCOUNT>"
databricks secrets put-secret {secretStore} SNOW_PWD --string-value "<YOUR SNOWFLAKE USER PASSWORD>"
databricks secrets put-secret {secretStore} SNOW_ACCOUNT --string-value "<YOUR SNOWFLAKE ACCOUNT>"

databricks secrets list-secrets {secretStore}

-- To retrieve a secret, first install jq to parse json ouput
apt-get update
apt-get install jq

-- Then retrieve the secret value and convert it from base64 to plain text
databricks secrets get-secret {secretStore} SNOW_USER | jq -r '.value' | base64 --decode; echo

""")
In [0]:
# Load all the secrets into a dictionary for convenience
secretStore = "dbmigrate"
secrets = {s.key: dbutils.secrets.get(scope=secretStore, key=s.key)
           for s in dbutils.secrets.list(secretStore)}
secrets

Connection parameters¶

In [0]:
### ADLS container identifiers
azStorageAccount, azSAStoken = secrets['ADLS_ACCOUNT'], secrets['ADLS_SAS_TOKEN']
azBlobContainer = "kurtssnowbrickexchange"

# Snowflake ADLS access
snowBlobURL = f'azure://{azStorageAccount}.blob.core.windows.net/{azBlobContainer}'
snowCreds = { 'warehouse': 'DEMO_WH',
              'user': secrets['SNOW_USER'],
              'account': secrets['SNOW_ACCOUNT'],
              'password': secrets['SNOW_PWD'] }
snowStage = "DATABRICKS_IMPORT"
snowSchema = "patient.silver"

# Databricks ADLS access
# dbxBlobURL = f"abfss://{azBlobContainer}@{azStorageAccount}.dfs.core.windows.net"
dbxBlobURL = f"wasbs://{azBlobContainer}@{azStorageAccount}.blob.core.windows.net"
dbxSchema = "patient_import.silver"  # Unity Catalog name for the import location

# Populate spark keys that can serve as variables in the notebook SQL cells later
spark.conf.set("stagefile.accessToken", azSAStoken)
spark.conf.set("stagefile.catalog", dbxSchema.split('.')[0])
spark.conf.set("stagefile.schema", dbxSchema)

Snowflake connection¶

In [0]:
# Setup the snowlfake connection used to execute the per-table exports in the main cycle

import snowflake.connector
# Even though we fully qualify the schema.table in the SQL statements later,
# we also force the schema default in the session connection for redundancy
snowCreds |= { 'database': snowSchema.split(".")[0], 'schema': snowSchema.split(".")[1] }
snow_con = snowflake.connector.connect(**snowCreds)

def snow_exec(sql, snow_con=snow_con):
    cur = snow_con.cursor().execute(sql)
    result = { 'columns': [desc[0] for desc in cur.description],
               'data': cur.fetchall() }
    cur.close()
    return result

# packaging SQL results in a dataframe gets nicely displayed in the Databricks notebook
import pandas as pd
pd.set_option('display.max_colwidth', None)
prettify = lambda sql_result: pd.DataFrame(**sql_result)

Azure ADLS Container¶

In [0]:
# See whats already in the AZURE container

from azure.storage.blob import ContainerClient

azGetBlobs = ContainerClient(
                account_url=f"https://{azStorageAccount}.blob.core.windows.net",
                container_name=azBlobContainer, credential=azSAStoken)

# Iterate through the blobs and print the names
for blob in azGetBlobs.list_blobs(): print(blob.name)

Setup export and import points¶

In [0]:
# Create the snowflake stage area connected to the external Azure container
# This only needs to be run once

prettify(snow_exec(f"""
CREATE STAGE IF NOT EXISTS {snowStage} 
	URL = '{snowBlobURL}', 
	CREDENTIALS = ( AZURE_SAS_TOKEN = '{azSAStoken}' ) 
	DIRECTORY = ( ENABLE = true );
""" ))
In [0]:
%sql
-- Make sure the Unity Catalog Schema exists, this only needs to be run once
CREATE CATALOG IF NOT EXISTS ${stagefile.catalog};
CREATE SCHEMA IF NOT EXISTS ${stagefile.schema};

-- DROP CATALOG IF EXISTS ${stagefile.catalog} CASCADE;

Main "per table" process¶

In [0]:
""" Initial tables of interest:
PATIENTS
ADMISSIONS
DRGCODES
D_ICD_DIAGNOSES
DIAGNOSES_ICD
D_ICD_PROCEDURES
PROCEDURES_ICD
NOTE_DISCHARGE
TRANSFERS
"""

# the table to export and then import, change this to whatever you want
# tableName = "DIAGNOSES_ICD" 
tableName = input("tableName")
In [0]:
# Setup table specific processing variables that can be used in the SQL notebook cells
spark.conf.set("stagefile.import", dbxBlobURL + "/" + tableName)  # ADLS container folder path
spark.conf.set("stagefile.target", dbxSchema + "." + tableName)   # Unity Catalog table path

# Dump out all the spark keys that will be used in the SQL for debugging purposes
{k: v for k, v in spark.conf.getAll.items() if k.startswith("stagefile.")}
In [0]:
# Execute snowflake "copy into" to export tableName into the ADLS container as a parquet file collection
# Make note of the total rows and compare with the import cell later

prettify(snow_exec(f"""
copy into @{snowStage}/{tableName}/{tableName}
    from {tableName}
    OVERWRITE = TRUE
    HEADER = TRUE
    FILE_FORMAT = (TYPE = PARQUET);
"""))
In [0]:
# Check to see the blob now exists in the container

for blob in azGetBlobs.list_blobs(name_starts_with=tableName): print(blob.name)
In [0]:
%sql
-- Setup the target table in Unity Catalog
-- DROP TABLE ${stagefile.target};
CREATE TABLE IF NOT EXISTS ${stagefile.target};
In [0]:
%sql
-- COPY INTO will insert into an existing table, so clear it out in case an old copy is there
TRUNCATE TABLE ${stagefile.target}; 

COPY INTO ${stagefile.target}
FROM "${stagefile.import}"
  WITH ( CREDENTIAL( AZURE_SAS_TOKEN = "${stagefile.accessToken}" ))
FILEFORMAT = PARQUET
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true', 'force' = 'true');  -- force = overwrite
In [0]:
# Snowflake meta-data for the table
prettify(snow_exec(f"""
    DESCRIBE TABLE {snowSchema}.{tableName}
"""))
In [0]:
# Snowflake meta-data for the table
prettify(snow_exec(f"""
    SELECT COUNT(SUBJECT_ID), COUNT(DISTINCT SUBJECT_ID)
    FROM {snowSchema}.{tableName}
"""))
In [0]:
# Databricks meta-data for the table
display(spark.sql(f"""
    DESCRIBE TABLE {dbxSchema}.{tableName}
"""))
In [0]:
%sql
-- The same thing using a SQL magic cell
DESCRIBE TABLE ${stagefile.target}
In [0]:
%sql
-- Databricks meta-data for the table
SELECT COUNT(SUBJECT_ID), COUNT(DISTINCT SUBJECT_ID)
FROM ${stagefile.target}

Business SQL Test¶

In [0]:
# Top 5 DRGs with the highest number of pneumonia diagnoses with mortality counts

business_sql_test = \
"""
SELECT
   concat(DR.DESCRIPTION,' (', DR.DRG_CODE, ')') AS "DRG Description",
   count(DISTINCT A.HADM_ID) AS "Pneumonia Diagnosis",
   count(
      DISTINCT CASE
         WHEN A.HOSPITAL_EXPIRE_FLAG = TRUE
         THEN A.HADM_ID
         ELSE NULL
      END
   ) AS "Mortality Count"
FROM ADMISSIONS AS A
JOIN DIAGNOSES_ICD AS D ON A.HADM_ID = D.HADM_ID
JOIN D_ICD_DIAGNOSES AS DI ON D.ICD_VER_CODE = DI.ICD_VER_CODE
JOIN DRGCODES AS DR ON A.HADM_ID = DR.HADM_ID
WHERE
   EXTRACT(YEAR FROM A.ADMITTIME) = 2124
   AND DI.LONG_TITLE ILIKE '%pneumonia%'
GROUP BY
   DR.DESCRIPTION,
   DR.DRG_CODE
ORDER BY
   "Pneumonia Diagnosis" DESC NULLS LAST
LIMIT 5
"""
In [0]:
# Databricks Top 5 DRGs with the highest number of pneumonia diagnoses

spark.sql(f"USE {dbxSchema}") 
display(spark.sql(business_sql_test.replace('"','`')))  # Databricks uses back-quotes for column aliases
In [0]:
# Snowflake Top 5 DRGs with the highest number of pneumonia diagnoses
snow_exec(f"USE {snowSchema}")
prettify(snow_exec(business_sql_test))