Skip to content

Instantly share code, notes, and snippets.

@jkapila
Created February 6, 2025 18:04
Show Gist options
  • Select an option

  • Save jkapila/b97d881e2ae8b75141184ac0f7831601 to your computer and use it in GitHub Desktop.

Select an option

Save jkapila/b97d881e2ae8b75141184ac0f7831601 to your computer and use it in GitHub Desktop.
Distributive Assertive Regression in python
import numpy as np
import pandas as pd
import statsmodels.api as sm
import matplotlib.pyplot as plt
# Defining Mean Average Percentage Error (MAPE)
def mape(actual, predicted, asmean=False):
ape = np.abs((actual - predicted) * 100) / actual
if asmean:
return np.mean(ape)
else:
return np.round(ape, 5)
# Decile Binning Function
def decile_binner(data, target_var, splitname='splits', breaks=10):
data[splitname] = pd.cut(data[target_var], bins=breaks, labels=False, right=True) + 1
return data
# Normalising the data
def normalise(x):
return (x - np.min(x)) / (np.max(x) - np.min(x))
# Creating Dummy Variables
def dummy_var(data, name, keep_var=False):
data_ = pd.get_dummies(data[name], prefix=name)
if keep_var:
return pd.concat([data, data_], axis=1)
else:
return data_
# KNN function to calculate nearest neighbors (using Euclidean distance)
def knn(mat, k):
dist_mat = np.linalg.norm(mat[:, None] - mat, axis=2) # Calculate pairwise Euclidean distances
neighbors = np.argsort(dist_mat, axis=1)[:, 1:k+1] # Get indices of the k nearest neighbors
return neighbors
# Model and Data Segregation with KNN
def dafr(formula, data, model=None, family="gaussian", dec_front=2, dec_back=2, knn_neighbours=5):
# Assuming formula is a dictionary or tuple with keys 'x' for features and 'y' for target
X = data[formula['x']]
y_actual = data[formula['y']]
# Add constant (intercept) to X for statsmodels OLS
X_const = sm.add_constant(X)
# Fit the original model using statsmodels OLS
if model is None:
model = sm.OLS(y_actual, X_const).fit()
# Predictions using the model
y_orig = model.predict(X_const)
# Calculate MAPE for the original model
results = pd.DataFrame({'actuals': y_actual, 'original': y_orig})
results = decile_binner(results, 'actuals', splitname='splits')
# MAPE by split
curve_ape = results.groupby('splits').apply(lambda x: mape(x['actuals'], x['original'], asmean=True)).reset_index()
curve_ape.columns = ['splits', 'mape']
# Plot MAPE Curve
plt.plot(curve_ape['splits'], curve_ape['mape'], marker='o')
plt.title("Plot of Unsplitted Absolute Percentage Error")
plt.xlabel("Split Index")
plt.ylabel("Mean Absolute Percentage Error")
plt.show()
# Split the data into front, middle, and back based on deciles
front_idx = results[results['splits'] <= dec_front].index
mid_idx = results[(results['splits'] > dec_front) & (results['splits'] <= (10 - dec_back))].index
back_idx = results[results['splits'] > (10 - dec_back)].index
# Fit models on splits and calculate predictions
models = {}
pred_dec = []
# Front Model
if len(front_idx) > 0:
X_front = X.loc[front_idx]
X_front_const = sm.add_constant(X_front)
model_front = sm.OLS(y_actual.loc[front_idx], X_front_const).fit()
models['Front_Model'] = model_front
pred_front = model_front.predict(X_front_const)
pred_dec.extend(pred_front)
# Mid Model
if len(mid_idx) > 0:
X_mid = X.loc[mid_idx]
X_mid_const = sm.add_constant(X_mid)
model_mid = sm.OLS(y_actual.loc[mid_idx], X_mid_const).fit()
models['Mid_Model'] = model_mid
pred_mid = model_mid.predict(X_mid_const)
pred_dec.extend(pred_mid)
# Back Model
if len(back_idx) > 0:
X_back = X.loc[back_idx]
X_back_const = sm.add_constant(X_back)
model_back = sm.OLS(y_actual.loc[back_idx], X_back_const).fit()
models['Back_Model'] = model_back
pred_back = model_back.predict(X_back_const)
pred_dec.extend(pred_back)
# Store final predictions and MAPE for deciled data
results['dec_pred'] = pred_dec
curve_ape_dec = results.groupby('splits').apply(lambda x: mape(x['actuals'], x['dec_pred'], asmean=True)).reset_index()
curve_ape_dec.columns = ['split', 'mape_dec']
# Combine results and plot MAPE for deciled data
curve_ape = pd.merge(curve_ape, curve_ape_dec[['split', 'mape_dec']], left_on='splits', right_on='split')
plt.plot(curve_ape['split'], curve_ape['mape_dec'], marker='o')
plt.title("Plot of Splitted Absolute Percentage Error")
plt.xlabel("Split Index")
plt.ylabel("Mean Absolute Percentage Error")
plt.show()
# Return results
dafr_result = {
'formula': formula,
'models': models,
'results': results[['splits', 'actuals', 'original', 'dec_pred']],
'mapes': curve_ape,
'split_freq': results['splits'].value_counts()
}
return dafr_result
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment