Snowflake
Load data from Data Platform to Snowflake
From Destination AWS S3
Pre-requisits
- Have Dreamdata's data platform AWS S3 destination enabled (go to this blog to enable it if you haven't!)
<S3_BUCKET>
refers to the bucket name used in Dreamdata's data platform AWS S3 destinationConfiguring the Snowflake storage integration
This section is based on snowflake's documentation, which is already very complete.
- Create an S3 storage integration (Snowflake)
CREATE STORAGE INTEGRATION <S3_INT>
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'S3'
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = '<AWS_ROLE_ARN>'
STORAGE_ALLOWED_LOCATIONS = ('s3://<S3_BUCKET>')From now on, <S3_INT> refers to the storage integration created above - Fetch AWS credentials created by Snowflake as part of the storage integration (Snowflake)
DESC INTEGRATION <S3_INT>;
-- Find AWS IAM User ARN under STORAGE_AWS_IAM_USER_ARN
-- Find AWS External ID under STORAGE_AWS_EXTERNAL_IDFrom now on,<STORAGE_AWS_IAM_USER_ARN>
will be usedFrom now on,<STORAGE_AWS_EXTERNAL_ID>
will be used - Create an AWS Role (AWS)From now on,
<AWS_ROLE_ARN>
refers to the ARN of the newly created role - Create an AWS S3 policy that allows access to the S3 bucket (AWS)
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:GetObjectVersion"
],
"Resource": "arn:aws:s3:::<S3_BUCKET>/*"
},
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource": "arn:aws:s3:::<S3_BUCKET>"
}
]
} - Attach the policy created in (4) to the role created in (3) (AWS)
- Only allow
<STORAGE_AWS_IAM_USER_ARN>
to assume the role created in (3). To do so, change role's trusted policies to (AWS){
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "<STORAGE_AWS_IAM_USER_ARN>"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "<STORAGE_AWS_EXTERNAL_ID>"
}
}
}
]
}
Load the data
- Define the variables that we will need
-- vars
SET BUCKET = '<S3_BUCKET>';
SET STORAGE_INT = '<S3_INT>'; - Parse the receipt file
SET RECEIPT_FILE = 'receipt.json';
SET RECEIPT_URL = concat($BUCKET, '/', $RECEIPT_FILE);
CREATE OR REPLACE STAGE receipt
STORAGE_INTEGRATION = $STORAGE_INT
URL = $RECEIPT_URL
FILE_FORMAT = (TYPE = JSON);
CREATE OR REPLACE TEMPORARY TABLE receipt AS
SELECT
key AS table_name,
$BUCKET || '/' || value:folder AS folder,
FROM
@receipt,
LATERAL FLATTEN(PARSE_JSON($1):tables); - Create a stage per table found in the receipt
CREATE OR REPLACE PROCEDURE STAGES()
RETURNS string
LANGUAGE JAVASCRIPT
EXECUTE AS CALLER
AS
$$
try {
// Define your SQL statement
var sqlStatement = "SELECT TABLE_NAME, FOLDER FROM receipt";
// Execute the SQL statement
var stmt = snowflake.createStatement({sqlText: sqlStatement});
var rs = stmt.execute();
// Get the URL value from the result set
var url = "";
var stage = "";
while (rs.next()) {
stage = rs.getColumnValue("TABLE_NAME");
url = rs.getColumnValue("FOLDER");
// Build the CREATE STAGE statement
var createStageStatement = "CREATE OR REPLACE STAGE " + stage + " " +
"STORAGE_INTEGRATION = s3_int " +
"URL = '" + url + "' " +
"FILE_FORMAT = (TYPE = PARQUET)";
// Execute the CREATE STAGE statement
stmt = snowflake.createStatement({sqlText: createStageStatement});
stmt.execute();
return "stages created successfully"
}
} catch (err) {
// Raise an error message if any exception occurs
throw "Error creating stages: " + err.message;
}
$$;
CALL STAGES();
SHOW stages; - See the data
SELECT * from @companies parse_json($1);
SELECT * from @contacts parse_json($1);
SELECT * from @events parse_json($1);
SELECT * from @paid_ads parse_json($1);
SELECT * from @revenue parse_json($1);
SELECT * from @revenue_attribution parse_json($1);