Skip to content

Instantly share code, notes, and snippets.

@b09dan
Last active March 12, 2025 21:22
Show Gist options
  • Select an option

  • Save b09dan/68980136ddf04b1da03b412d3ef3b1db to your computer and use it in GitHub Desktop.

Select an option

Save b09dan/68980136ddf04b1da03b412d3ef3b1db to your computer and use it in GitHub Desktop.
snowpark_sessions_experiment.py
from snowflake.snowpark import Session, DataFrame
from snowflake.snowpark.functions import col, concat
import snowflake.snowpark.exceptions
from libs.support.snowflake_connection import get_snowflake_spark_connection, get_snowflake_sql_connection, \
execute_snowflake_query
def create_tables():
# Create dimension tables
ddl_statements = [
"""CREATE OR REPLACE TABLE dim_orders (
order_id INT PRIMARY KEY,
client_id INT,
order_date DATE,
amount NUMBER(10,2));""",
"""CREATE OR REPLACE TABLE dim_clients (
client_id INT PRIMARY KEY,
first_name STRING,
last_name STRING,
address STRING,
postal_code STRING);""",
"""CREATE OR REPLACE TABLE dim_postal_labels (
order_id INT,
client_full_name STRING,
client_address STRING,
postal_code STRING,
postal_label STRING);"""
]
for ddl in ddl_statements:
execute_snowflake_query(ddl)
def main(session: Session):
# 1-2. Create tables
create_tables()
# 3. Generate sample data
orders_data = [
(1, 101, '2023-01-01', 150.00),
(2, 102, '2023-01-02', 200.00),
(3, 103, '2023-01-03', 300.00)
]
clients_data = [
(101, 'John', 'Doe', '123 Main St', '10001'),
(102, 'Jane', 'Smith', '456 Elm St', '10002'),
(103, 'Bob', 'Johnson', '789 Oak St', '10003')
]
# 4. Disable autocommit and start transaction
session.sql("ALTER SESSION SET AUTOCOMMIT = FALSE").collect()
session.sql("BEGIN").collect()
session.sql("USE SCHEMA DEV.RAW_00").collect()
try:
# 5. Create Snowpark DataFrames and
# 6. Write data to dimension tables
session.create_dataframe(orders_data,
schema=["ORDER_ID", "CLIENT_ID", "ORDER_DATE", "AMOUNT"]).write.mode("truncate").save_as_table("DIM_ORDERS")
session.create_dataframe(clients_data,
schema=["CLIENT_ID", "FIRST_NAME", "LAST_NAME", "ADDRESS", "POSTAL_CODE"]).write.mode("truncate").save_as_table("DIM_CLIENTS")
table_orders: DataFrame = (
session.table("DIM_ORDERS")
)
table_clients: DataFrame = (
session.table("DIM_CLIENTS")
)
# 7. Join orders and clients data
joined_df = table_orders.join(table_clients, table_orders["client_id"] == table_clients["client_id"])
joined_data = joined_df.select(
col("order_id"),
concat(col("first_name"), col("last_name")).alias("client_full_name"),
col("address").alias("client_address"),
col("postal_code")
)
# 8. Prepare and write postal labels data
postal_labels_df = joined_data.with_column("postal_label",
col("postal_code")) # Simplified example using postal code as label
postal_labels_df.write.mode("truncate").save_as_table("dim_postal_labels")
# 9. Commit transaction
session.sql("COMMIT").collect()
print("Transaction committed successfully!")
except Exception as e:
session.sql("ROLLBACK").collect()
print(f"Transaction rolled back due to error: {str(e)}")
raise e
finally:
session.sql("ALTER SESSION SET AUTOCOMMIT = TRUE").collect()
# 10. Display results
result = session.table("dim_postal_labels").limit(5).collect()
print("\nDim Postal Labels Data:")
for row in result:
print(row)
if __name__ == "__main__":
session = get_snowflake_spark_connection()
main(session)
session.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment