Last active
February 17, 2025 19:03
-
-
Save capableguptadotcom/84adf1c1acaaff9057ff661d7704ccd7 to your computer and use it in GitHub Desktop.
profiles data of sql tables
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from pyspark.sql import SparkSession | |
| from pyspark.sql.functions import col, count, countDistinct, regexp_extract | |
| from pyspark.sql.types import ( | |
| StructType, StructField, StringType, BooleanType, ArrayType, LongType, DoubleType | |
| ) | |
| # Initialize Spark session | |
| spark = SparkSession.builder.appName("MetadataExtractor").getOrCreate() | |
| # JDBC configuration for MS SQL Server (update with your connection details) | |
| jdbc_url = "jdbc:sqlserver://your_server:1433;databaseName=your_db" | |
| connection_properties = { | |
| "user": "your_user", | |
| "password": "your_password", | |
| "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver" | |
| } | |
| def get_basic_column_metadata(table_name, schema_name="dbo"): | |
| """ | |
| Retrieves structural metadata for the specified table from INFORMATION_SCHEMA. | |
| Checks for the table schema and logs a warning if no metadata is found. | |
| """ | |
| query = f""" | |
| (SELECT | |
| c.COLUMN_NAME, | |
| c.DATA_TYPE, | |
| c.CHARACTER_MAXIMUM_LENGTH, | |
| c.IS_NULLABLE, | |
| c.COLUMN_DEFAULT, | |
| CASE WHEN pk.COLUMN_NAME IS NOT NULL THEN 1 ELSE 0 END AS is_primary_key | |
| FROM INFORMATION_SCHEMA.COLUMNS c | |
| LEFT JOIN ( | |
| SELECT ku.TABLE_NAME, ku.COLUMN_NAME | |
| FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS tc | |
| JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS ku | |
| ON tc.CONSTRAINT_NAME = ku.CONSTRAINT_NAME | |
| WHERE tc.CONSTRAINT_TYPE = 'PRIMARY KEY' | |
| AND ku.TABLE_SCHEMA = '{schema_name}' | |
| ) pk ON c.TABLE_NAME = pk.TABLE_NAME AND c.COLUMN_NAME = pk.COLUMN_NAME | |
| WHERE c.TABLE_NAME = '{table_name}' AND c.TABLE_SCHEMA = '{schema_name}') as metadata | |
| """ | |
| try: | |
| basic_meta_df = spark.read.jdbc(url=jdbc_url, table=query, properties=connection_properties) | |
| if basic_meta_df.rdd.isEmpty(): | |
| print(f"Warning: No metadata found for table '{schema_name}.{table_name}'.") | |
| return basic_meta_df | |
| except Exception as e: | |
| print(f"Error in get_basic_column_metadata: {e}") | |
| return None | |
| def get_statistical_metadata(df, column_name): | |
| """ | |
| Calculates statistical properties for a given column in the DataFrame. | |
| Returns a dictionary with counts, uniqueness ratio, and top values (if the column is a string). | |
| """ | |
| stats = {} | |
| if df is None: | |
| print("DataFrame is None in get_statistical_metadata.") | |
| return stats | |
| total_rows = df.count() | |
| if total_rows == 0: | |
| print("DataFrame is empty in get_statistical_metadata.") | |
| return stats | |
| try: | |
| null_count = df.filter(col(column_name).isNull()).count() | |
| distinct_count = df.select(countDistinct(col(column_name))).collect()[0][0] | |
| stats["nullable"] = null_count > 0 | |
| stats["null_percentage"] = (null_count / total_rows) * 100 | |
| stats["distinct_values"] = distinct_count | |
| stats["uniqueness_ratio"] = distinct_count / total_rows | |
| # For string type columns, compute top values. | |
| if column_name in df.columns: | |
| dt = df.schema[column_name].dataType | |
| if isinstance(dt, StringType): | |
| top_values = ( | |
| df.groupBy(column_name) | |
| .count() | |
| .orderBy(col("count").desc()) | |
| .limit(5) | |
| .collect() | |
| ) | |
| stats["top_values"] = [row[column_name] for row in top_values if row[column_name] is not None] | |
| else: | |
| stats["top_values"] = [] | |
| except Exception as e: | |
| print(f"Error in get_statistical_metadata for column '{column_name}': {e}") | |
| return stats | |
| def get_constraints(table_name, column_name, schema_name="dbo"): | |
| """ | |
| Retrieves constraints (e.g., UNIQUE, FOREIGN KEY) for a given column using INFORMATION_SCHEMA. | |
| """ | |
| query = f""" | |
| (SELECT | |
| CONSTRAINT_NAME, | |
| CONSTRAINT_TYPE | |
| FROM INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE | |
| WHERE TABLE_NAME = '{table_name}' | |
| AND TABLE_SCHEMA = '{schema_name}' | |
| AND COLUMN_NAME = '{column_name}') as constraints | |
| """ | |
| try: | |
| cons = spark.read.jdbc(url=jdbc_url, table=query, properties=connection_properties).collect() | |
| return [row.CONSTRAINT_TYPE for row in cons] | |
| except Exception as e: | |
| print(f"Error in get_constraints for column '{column_name}': {e}") | |
| return [] | |
| def detect_pattern(df, column_name, pattern): | |
| """ | |
| Checks the fraction of non-null values in the column that match the given regex pattern. | |
| Returns a score between 0 and 1. | |
| """ | |
| if df is None or column_name not in df.columns: | |
| print(f"Column '{column_name}' not found in DataFrame in detect_pattern.") | |
| return 0 | |
| dt = df.schema[column_name].dataType | |
| if not isinstance(dt, StringType): | |
| return 0 | |
| try: | |
| total_non_null = df.filter(col(column_name).isNotNull()).count() | |
| if total_non_null == 0: | |
| return 0 | |
| match_count = df.filter(regexp_extract(col(column_name), pattern, 0) != "").count() | |
| return match_count / total_non_null | |
| except Exception as e: | |
| print(f"Error in detect_pattern for column '{column_name}': {e}") | |
| return 0 | |
| def get_full_metadata(table_name, sensitivity_mapping, schema_name="dbo"): | |
| """ | |
| Retrieves full metadata for the table including basic structural info and statistical profiling. | |
| Returns a DataFrame of metadata. | |
| """ | |
| # Fully qualified table name (schema.table) | |
| qualified_table = f"{schema_name}.{table_name}" | |
| try: | |
| df = spark.read.jdbc(url=jdbc_url, table=qualified_table, properties=connection_properties) | |
| except Exception as e: | |
| print(f"Error reading table data for '{qualified_table}': {e}") | |
| return None | |
| basic_meta_df = get_basic_column_metadata(table_name, schema_name) | |
| if basic_meta_df is None or basic_meta_df.rdd.isEmpty(): | |
| print("No basic metadata available. Exiting get_full_metadata.") | |
| return None | |
| # Define the schema for our metadata DataFrame | |
| result_schema = StructType([ | |
| StructField("column_name", StringType(), True), | |
| StructField("data_type", StringType(), True), | |
| StructField("nullable", BooleanType(), True), | |
| StructField("default_value", StringType(), True), | |
| StructField("constraints", ArrayType(StringType()), True), | |
| StructField("distinct_values", LongType(), True), | |
| StructField("uniqueness_ratio", DoubleType(), True), | |
| StructField("top_values", ArrayType(StringType()), True), | |
| StructField("pattern_match", DoubleType(), True), | |
| StructField("sensitivity", StringType(), True), | |
| StructField("description", StringType(), True) | |
| ]) | |
| results = [] | |
| for row in basic_meta_df.collect(): | |
| column_name = row["COLUMN_NAME"] | |
| # Compose data type string | |
| if row["CHARACTER_MAXIMUM_LENGTH"] is not None: | |
| data_type = f"{row['DATA_TYPE']}({row['CHARACTER_MAXIMUM_LENGTH']})" | |
| else: | |
| data_type = row["DATA_TYPE"] | |
| col_nullable = True if row["IS_NULLABLE"].strip().upper() == "YES" else False | |
| # Compute statistical metadata | |
| stats = get_statistical_metadata(df, column_name) | |
| # Retrieve constraints | |
| constraints = get_constraints(table_name, column_name, schema_name) | |
| # Optionally, if the column name indicates an email, perform a pattern check | |
| pattern_score = 0 | |
| if "email" in column_name.lower(): | |
| pattern_score = detect_pattern(df, column_name, r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$") | |
| result = { | |
| "column_name": column_name, | |
| "data_type": data_type, | |
| "nullable": col_nullable, | |
| "default_value": row["COLUMN_DEFAULT"], | |
| "constraints": constraints, | |
| "distinct_values": stats.get("distinct_values", 0), | |
| "uniqueness_ratio": stats.get("uniqueness_ratio", 0.0), | |
| "top_values": stats.get("top_values", []), | |
| "pattern_match": pattern_score, | |
| "sensitivity": sensitivity_mapping.get(column_name, "Normal"), | |
| "description": "" # This field can be updated with external documentation | |
| } | |
| results.append(result) | |
| try: | |
| metadata_df = spark.createDataFrame(results, schema=result_schema) | |
| return metadata_df | |
| except Exception as e: | |
| print(f"Error creating metadata DataFrame: {e}") | |
| return None | |
| def get_foreign_keys(table_name, schema_name="dbo"): | |
| """ | |
| Retrieves foreign key relationships for the specified table. | |
| """ | |
| query = f""" | |
| (SELECT | |
| fk.name AS fk_name, | |
| c.name AS column_name, | |
| OBJECT_NAME(fk.referenced_object_id) AS referenced_table | |
| FROM sys.foreign_keys AS fk | |
| JOIN sys.foreign_key_columns AS fkc | |
| ON fk.object_id = fkc.constraint_object_id | |
| JOIN sys.columns AS c | |
| ON fkc.parent_object_id = c.object_id | |
| AND fkc.parent_column_id = c.column_id | |
| WHERE OBJECT_NAME(fk.parent_object_id) = '{table_name}') as fk_info | |
| """ | |
| try: | |
| fk_df = spark.read.jdbc(url=jdbc_url, table=query, properties=connection_properties) | |
| return fk_df | |
| except Exception as e: | |
| print(f"Error in get_foreign_keys for table '{table_name}': {e}") | |
| return None | |
| def get_available_columns(table_name, schema_name="dbo"): | |
| """ | |
| Returns a list of available column names from the specified table. | |
| """ | |
| query = f""" | |
| (SELECT COLUMN_NAME | |
| FROM INFORMATION_SCHEMA.COLUMNS | |
| WHERE TABLE_NAME = '{table_name}' | |
| AND TABLE_SCHEMA = '{schema_name}') as cols | |
| """ | |
| try: | |
| df_columns = spark.read.jdbc(url=jdbc_url, table=query, properties=connection_properties) | |
| cols = [row["COLUMN_NAME"] for row in df_columns.collect()] | |
| if not cols: | |
| print(f"No columns found for table '{schema_name}.{table_name}'.") | |
| return cols | |
| except Exception as e: | |
| print(f"Error in get_available_columns: {e}") | |
| return [] | |
| def get_column_details_markdown(table_name, column_name, sensitivity_mapping, schema_name="dbo"): | |
| """ | |
| Fetches metadata details for a specific column and returns them formatted in Markdown. | |
| """ | |
| metadata_df = get_full_metadata(table_name, sensitivity_mapping, schema_name) | |
| if metadata_df is None or metadata_df.rdd.isEmpty(): | |
| return f"**Error:** No metadata available for table '{schema_name}.{table_name}'." | |
| try: | |
| column_meta = metadata_df.filter(col("column_name") == column_name).collect() | |
| if not column_meta: | |
| return f"**Error:** Column '{column_name}' not found in table '{schema_name}.{table_name}'." | |
| meta = column_meta[0] | |
| md = f"""### Column: {meta['column_name']} | |
| - **Data Type:** {meta['data_type']} | |
| - **Nullable:** {"Yes" if meta['nullable'] else "No"} | |
| - **Default Value:** {meta['default_value'] if meta['default_value'] is not None else "None"} | |
| - **Constraints:** {', '.join(meta['constraints']) if meta['constraints'] else "None"} | |
| - **Distinct Values:** {meta['distinct_values']} | |
| - **Uniqueness Ratio:** {meta['uniqueness_ratio']:.2f} | |
| - **Top Values:** {', '.join(meta['top_values']) if meta['top_values'] else "None"} | |
| - **Pattern Match Score:** {meta['pattern_match']:.2f} | |
| - **Sensitivity:** {meta['sensitivity']} | |
| - **Description:** {meta['description'] if meta['description'] else "No description provided."} | |
| """ | |
| return md | |
| except Exception as e: | |
| return f"Error generating markdown details for column '{column_name}': {e}" | |
| # === Example usage === | |
| if __name__ == "__main__": | |
| sensitivity_config = { | |
| "customer_email": "PII (High)", | |
| "credit_card_number": "PCI (Critical)" | |
| } | |
| # Specify the table name and schema (adjust as needed) | |
| table_name = "customers" | |
| schema_name = "dbo" | |
| # Retrieve full metadata and display it | |
| metadata_df = get_full_metadata(table_name, sensitivity_config, schema_name) | |
| if metadata_df is not None: | |
| metadata_df.show(truncate=False) | |
| # Retrieve and display foreign keys for the table | |
| fk_df = get_foreign_keys(table_name, schema_name) | |
| if fk_df is not None: | |
| fk_df.show(truncate=False) | |
| # Get a list of available columns and print them | |
| columns_list = get_available_columns(table_name, schema_name) | |
| print("Available columns:", columns_list) | |
| # Get Markdown details for a specific column (e.g., 'customer_email') and print them | |
| markdown_details = get_column_details_markdown(table_name, "customer_email", sensitivity_config, schema_name) | |
| print(markdown_details) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment