Last active
March 12, 2025 21:22
-
-
Save b09dan/68980136ddf04b1da03b412d3ef3b1db to your computer and use it in GitHub Desktop.
snowpark_sessions_experiment.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from snowflake.snowpark import Session, DataFrame | |
| from snowflake.snowpark.functions import col, concat | |
| import snowflake.snowpark.exceptions | |
| from libs.support.snowflake_connection import get_snowflake_spark_connection, get_snowflake_sql_connection, \ | |
| execute_snowflake_query | |
| def create_tables(): | |
| # Create dimension tables | |
| ddl_statements = [ | |
| """CREATE OR REPLACE TABLE dim_orders ( | |
| order_id INT PRIMARY KEY, | |
| client_id INT, | |
| order_date DATE, | |
| amount NUMBER(10,2));""", | |
| """CREATE OR REPLACE TABLE dim_clients ( | |
| client_id INT PRIMARY KEY, | |
| first_name STRING, | |
| last_name STRING, | |
| address STRING, | |
| postal_code STRING);""", | |
| """CREATE OR REPLACE TABLE dim_postal_labels ( | |
| order_id INT, | |
| client_full_name STRING, | |
| client_address STRING, | |
| postal_code STRING, | |
| postal_label STRING);""" | |
| ] | |
| for ddl in ddl_statements: | |
| execute_snowflake_query(ddl) | |
| def main(session: Session): | |
| # 1-2. Create tables | |
| create_tables() | |
| # 3. Generate sample data | |
| orders_data = [ | |
| (1, 101, '2023-01-01', 150.00), | |
| (2, 102, '2023-01-02', 200.00), | |
| (3, 103, '2023-01-03', 300.00) | |
| ] | |
| clients_data = [ | |
| (101, 'John', 'Doe', '123 Main St', '10001'), | |
| (102, 'Jane', 'Smith', '456 Elm St', '10002'), | |
| (103, 'Bob', 'Johnson', '789 Oak St', '10003') | |
| ] | |
| # 4. Disable autocommit and start transaction | |
| session.sql("ALTER SESSION SET AUTOCOMMIT = FALSE").collect() | |
| session.sql("BEGIN").collect() | |
| session.sql("USE SCHEMA DEV.RAW_00").collect() | |
| try: | |
| # 5. Create Snowpark DataFrames and | |
| # 6. Write data to dimension tables | |
| session.create_dataframe(orders_data, | |
| schema=["ORDER_ID", "CLIENT_ID", "ORDER_DATE", "AMOUNT"]).write.mode("truncate").save_as_table("DIM_ORDERS") | |
| session.create_dataframe(clients_data, | |
| schema=["CLIENT_ID", "FIRST_NAME", "LAST_NAME", "ADDRESS", "POSTAL_CODE"]).write.mode("truncate").save_as_table("DIM_CLIENTS") | |
| table_orders: DataFrame = ( | |
| session.table("DIM_ORDERS") | |
| ) | |
| table_clients: DataFrame = ( | |
| session.table("DIM_CLIENTS") | |
| ) | |
| # 7. Join orders and clients data | |
| joined_df = table_orders.join(table_clients, table_orders["client_id"] == table_clients["client_id"]) | |
| joined_data = joined_df.select( | |
| col("order_id"), | |
| concat(col("first_name"), col("last_name")).alias("client_full_name"), | |
| col("address").alias("client_address"), | |
| col("postal_code") | |
| ) | |
| # 8. Prepare and write postal labels data | |
| postal_labels_df = joined_data.with_column("postal_label", | |
| col("postal_code")) # Simplified example using postal code as label | |
| postal_labels_df.write.mode("truncate").save_as_table("dim_postal_labels") | |
| # 9. Commit transaction | |
| session.sql("COMMIT").collect() | |
| print("Transaction committed successfully!") | |
| except Exception as e: | |
| session.sql("ROLLBACK").collect() | |
| print(f"Transaction rolled back due to error: {str(e)}") | |
| raise e | |
| finally: | |
| session.sql("ALTER SESSION SET AUTOCOMMIT = TRUE").collect() | |
| # 10. Display results | |
| result = session.table("dim_postal_labels").limit(5).collect() | |
| print("\nDim Postal Labels Data:") | |
| for row in result: | |
| print(row) | |
| if __name__ == "__main__": | |
| session = get_snowflake_spark_connection() | |
| main(session) | |
| session.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment