Skip to content

Instantly share code, notes, and snippets.

@capableguptadotcom
Last active February 17, 2025 19:03
Show Gist options
  • Select an option

  • Save capableguptadotcom/84adf1c1acaaff9057ff661d7704ccd7 to your computer and use it in GitHub Desktop.

Select an option

Save capableguptadotcom/84adf1c1acaaff9057ff661d7704ccd7 to your computer and use it in GitHub Desktop.
profiles data of sql tables
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, countDistinct, regexp_extract
from pyspark.sql.types import (
StructType, StructField, StringType, BooleanType, ArrayType, LongType, DoubleType
)
# Initialize Spark session
spark = SparkSession.builder.appName("MetadataExtractor").getOrCreate()
# JDBC configuration for MS SQL Server (update with your connection details)
jdbc_url = "jdbc:sqlserver://your_server:1433;databaseName=your_db"
connection_properties = {
"user": "your_user",
"password": "your_password",
"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
def get_basic_column_metadata(table_name, schema_name="dbo"):
"""
Retrieves structural metadata for the specified table from INFORMATION_SCHEMA.
Checks for the table schema and logs a warning if no metadata is found.
"""
query = f"""
(SELECT
c.COLUMN_NAME,
c.DATA_TYPE,
c.CHARACTER_MAXIMUM_LENGTH,
c.IS_NULLABLE,
c.COLUMN_DEFAULT,
CASE WHEN pk.COLUMN_NAME IS NOT NULL THEN 1 ELSE 0 END AS is_primary_key
FROM INFORMATION_SCHEMA.COLUMNS c
LEFT JOIN (
SELECT ku.TABLE_NAME, ku.COLUMN_NAME
FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS tc
JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS ku
ON tc.CONSTRAINT_NAME = ku.CONSTRAINT_NAME
WHERE tc.CONSTRAINT_TYPE = 'PRIMARY KEY'
AND ku.TABLE_SCHEMA = '{schema_name}'
) pk ON c.TABLE_NAME = pk.TABLE_NAME AND c.COLUMN_NAME = pk.COLUMN_NAME
WHERE c.TABLE_NAME = '{table_name}' AND c.TABLE_SCHEMA = '{schema_name}') as metadata
"""
try:
basic_meta_df = spark.read.jdbc(url=jdbc_url, table=query, properties=connection_properties)
if basic_meta_df.rdd.isEmpty():
print(f"Warning: No metadata found for table '{schema_name}.{table_name}'.")
return basic_meta_df
except Exception as e:
print(f"Error in get_basic_column_metadata: {e}")
return None
def get_statistical_metadata(df, column_name):
"""
Calculates statistical properties for a given column in the DataFrame.
Returns a dictionary with counts, uniqueness ratio, and top values (if the column is a string).
"""
stats = {}
if df is None:
print("DataFrame is None in get_statistical_metadata.")
return stats
total_rows = df.count()
if total_rows == 0:
print("DataFrame is empty in get_statistical_metadata.")
return stats
try:
null_count = df.filter(col(column_name).isNull()).count()
distinct_count = df.select(countDistinct(col(column_name))).collect()[0][0]
stats["nullable"] = null_count > 0
stats["null_percentage"] = (null_count / total_rows) * 100
stats["distinct_values"] = distinct_count
stats["uniqueness_ratio"] = distinct_count / total_rows
# For string type columns, compute top values.
if column_name in df.columns:
dt = df.schema[column_name].dataType
if isinstance(dt, StringType):
top_values = (
df.groupBy(column_name)
.count()
.orderBy(col("count").desc())
.limit(5)
.collect()
)
stats["top_values"] = [row[column_name] for row in top_values if row[column_name] is not None]
else:
stats["top_values"] = []
except Exception as e:
print(f"Error in get_statistical_metadata for column '{column_name}': {e}")
return stats
def get_constraints(table_name, column_name, schema_name="dbo"):
"""
Retrieves constraints (e.g., UNIQUE, FOREIGN KEY) for a given column using INFORMATION_SCHEMA.
"""
query = f"""
(SELECT
CONSTRAINT_NAME,
CONSTRAINT_TYPE
FROM INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE
WHERE TABLE_NAME = '{table_name}'
AND TABLE_SCHEMA = '{schema_name}'
AND COLUMN_NAME = '{column_name}') as constraints
"""
try:
cons = spark.read.jdbc(url=jdbc_url, table=query, properties=connection_properties).collect()
return [row.CONSTRAINT_TYPE for row in cons]
except Exception as e:
print(f"Error in get_constraints for column '{column_name}': {e}")
return []
def detect_pattern(df, column_name, pattern):
"""
Checks the fraction of non-null values in the column that match the given regex pattern.
Returns a score between 0 and 1.
"""
if df is None or column_name not in df.columns:
print(f"Column '{column_name}' not found in DataFrame in detect_pattern.")
return 0
dt = df.schema[column_name].dataType
if not isinstance(dt, StringType):
return 0
try:
total_non_null = df.filter(col(column_name).isNotNull()).count()
if total_non_null == 0:
return 0
match_count = df.filter(regexp_extract(col(column_name), pattern, 0) != "").count()
return match_count / total_non_null
except Exception as e:
print(f"Error in detect_pattern for column '{column_name}': {e}")
return 0
def get_full_metadata(table_name, sensitivity_mapping, schema_name="dbo"):
"""
Retrieves full metadata for the table including basic structural info and statistical profiling.
Returns a DataFrame of metadata.
"""
# Fully qualified table name (schema.table)
qualified_table = f"{schema_name}.{table_name}"
try:
df = spark.read.jdbc(url=jdbc_url, table=qualified_table, properties=connection_properties)
except Exception as e:
print(f"Error reading table data for '{qualified_table}': {e}")
return None
basic_meta_df = get_basic_column_metadata(table_name, schema_name)
if basic_meta_df is None or basic_meta_df.rdd.isEmpty():
print("No basic metadata available. Exiting get_full_metadata.")
return None
# Define the schema for our metadata DataFrame
result_schema = StructType([
StructField("column_name", StringType(), True),
StructField("data_type", StringType(), True),
StructField("nullable", BooleanType(), True),
StructField("default_value", StringType(), True),
StructField("constraints", ArrayType(StringType()), True),
StructField("distinct_values", LongType(), True),
StructField("uniqueness_ratio", DoubleType(), True),
StructField("top_values", ArrayType(StringType()), True),
StructField("pattern_match", DoubleType(), True),
StructField("sensitivity", StringType(), True),
StructField("description", StringType(), True)
])
results = []
for row in basic_meta_df.collect():
column_name = row["COLUMN_NAME"]
# Compose data type string
if row["CHARACTER_MAXIMUM_LENGTH"] is not None:
data_type = f"{row['DATA_TYPE']}({row['CHARACTER_MAXIMUM_LENGTH']})"
else:
data_type = row["DATA_TYPE"]
col_nullable = True if row["IS_NULLABLE"].strip().upper() == "YES" else False
# Compute statistical metadata
stats = get_statistical_metadata(df, column_name)
# Retrieve constraints
constraints = get_constraints(table_name, column_name, schema_name)
# Optionally, if the column name indicates an email, perform a pattern check
pattern_score = 0
if "email" in column_name.lower():
pattern_score = detect_pattern(df, column_name, r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$")
result = {
"column_name": column_name,
"data_type": data_type,
"nullable": col_nullable,
"default_value": row["COLUMN_DEFAULT"],
"constraints": constraints,
"distinct_values": stats.get("distinct_values", 0),
"uniqueness_ratio": stats.get("uniqueness_ratio", 0.0),
"top_values": stats.get("top_values", []),
"pattern_match": pattern_score,
"sensitivity": sensitivity_mapping.get(column_name, "Normal"),
"description": "" # This field can be updated with external documentation
}
results.append(result)
try:
metadata_df = spark.createDataFrame(results, schema=result_schema)
return metadata_df
except Exception as e:
print(f"Error creating metadata DataFrame: {e}")
return None
def get_foreign_keys(table_name, schema_name="dbo"):
"""
Retrieves foreign key relationships for the specified table.
"""
query = f"""
(SELECT
fk.name AS fk_name,
c.name AS column_name,
OBJECT_NAME(fk.referenced_object_id) AS referenced_table
FROM sys.foreign_keys AS fk
JOIN sys.foreign_key_columns AS fkc
ON fk.object_id = fkc.constraint_object_id
JOIN sys.columns AS c
ON fkc.parent_object_id = c.object_id
AND fkc.parent_column_id = c.column_id
WHERE OBJECT_NAME(fk.parent_object_id) = '{table_name}') as fk_info
"""
try:
fk_df = spark.read.jdbc(url=jdbc_url, table=query, properties=connection_properties)
return fk_df
except Exception as e:
print(f"Error in get_foreign_keys for table '{table_name}': {e}")
return None
def get_available_columns(table_name, schema_name="dbo"):
"""
Returns a list of available column names from the specified table.
"""
query = f"""
(SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = '{table_name}'
AND TABLE_SCHEMA = '{schema_name}') as cols
"""
try:
df_columns = spark.read.jdbc(url=jdbc_url, table=query, properties=connection_properties)
cols = [row["COLUMN_NAME"] for row in df_columns.collect()]
if not cols:
print(f"No columns found for table '{schema_name}.{table_name}'.")
return cols
except Exception as e:
print(f"Error in get_available_columns: {e}")
return []
def get_column_details_markdown(table_name, column_name, sensitivity_mapping, schema_name="dbo"):
"""
Fetches metadata details for a specific column and returns them formatted in Markdown.
"""
metadata_df = get_full_metadata(table_name, sensitivity_mapping, schema_name)
if metadata_df is None or metadata_df.rdd.isEmpty():
return f"**Error:** No metadata available for table '{schema_name}.{table_name}'."
try:
column_meta = metadata_df.filter(col("column_name") == column_name).collect()
if not column_meta:
return f"**Error:** Column '{column_name}' not found in table '{schema_name}.{table_name}'."
meta = column_meta[0]
md = f"""### Column: {meta['column_name']}
- **Data Type:** {meta['data_type']}
- **Nullable:** {"Yes" if meta['nullable'] else "No"}
- **Default Value:** {meta['default_value'] if meta['default_value'] is not None else "None"}
- **Constraints:** {', '.join(meta['constraints']) if meta['constraints'] else "None"}
- **Distinct Values:** {meta['distinct_values']}
- **Uniqueness Ratio:** {meta['uniqueness_ratio']:.2f}
- **Top Values:** {', '.join(meta['top_values']) if meta['top_values'] else "None"}
- **Pattern Match Score:** {meta['pattern_match']:.2f}
- **Sensitivity:** {meta['sensitivity']}
- **Description:** {meta['description'] if meta['description'] else "No description provided."}
"""
return md
except Exception as e:
return f"Error generating markdown details for column '{column_name}': {e}"
# === Example usage ===
if __name__ == "__main__":
sensitivity_config = {
"customer_email": "PII (High)",
"credit_card_number": "PCI (Critical)"
}
# Specify the table name and schema (adjust as needed)
table_name = "customers"
schema_name = "dbo"
# Retrieve full metadata and display it
metadata_df = get_full_metadata(table_name, sensitivity_config, schema_name)
if metadata_df is not None:
metadata_df.show(truncate=False)
# Retrieve and display foreign keys for the table
fk_df = get_foreign_keys(table_name, schema_name)
if fk_df is not None:
fk_df.show(truncate=False)
# Get a list of available columns and print them
columns_list = get_available_columns(table_name, schema_name)
print("Available columns:", columns_list)
# Get Markdown details for a specific column (e.g., 'customer_email') and print them
markdown_details = get_column_details_markdown(table_name, "customer_email", sensitivity_config, schema_name)
print(markdown_details)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment