Skip to content

Instantly share code, notes, and snippets.

@lmc2179
Last active June 15, 2020 21:37
Show Gist options
  • Select an option

  • Save lmc2179/103958a1b03ec9d65f26d64177fb0288 to your computer and use it in GitHub Desktop.

Select an option

Save lmc2179/103958a1b03ec9d65f26d64177fb0288 to your computer and use it in GitHub Desktop.
partial correlation analysis
from statsmodels.regression.linear_model import OLS
from statsmodels.tools import add_constant
from statsmodels.stats.multitest import multipletests
import numpy as np
from matplotlib import pyplot as plt
import seaborn as sns
import pandas as pd
from sklearn.datasets import load_boston
# Analysis of internal partial correlation structure of a matrix, with appropriate multiple testing correction
class PartialCorrelation(object):
def __init__(self, X, alpha=.05, multiple_test_method='holm'):
self.regression_models, self.regression_fits = self._run_regression(X)
self.uncorrected_p_values, self.p_values = self._get_p_values(self.regression_fits, alpha, multiple_test_method)
self.params = self._get_params(self.regression_fits)
self.significance = (self.p_values < alpha).astype('int')
def _run_regression(self, X):
models, fits = [], []
for i in range(X.shape[1]):
X_i = np.delete(X, i, axis=1)
y_i = X[:,i]
model = OLS(y_i, add_constant(X_i))
result = model.fit()
models.append(model)
fits.append(result)
return models, fits
def _get_p_values(self, regression_fits, alpha, multiple_test_method):
p_values = []
for i, rf in enumerate(regression_fits):
p = rf.pvalues
if i > 0:
p = np.concatenate((p[1:i+1], [p[0]], p[i+1:])).tolist()
p_values.append(p)
p_values = np.array(p_values)
corrected_p = multipletests(p_values.flatten())[1].reshape(p_values.shape)
return p_values, corrected_p
def _get_params(self, regression_fits):
params = []
for i, rf in enumerate(regression_fits):
p = rf.params
if i > 0:
p = np.concatenate((p[1:i+1], [p[0]], p[i+1:]))
params.append(p)
return np.array(params)
def get_p_values(self):
return self.p_values
def get_uncorrected_p_values(self):
return self.uncorrected_p_values
def get_significance(self):
return self.significance
def get_params(self):
return self.params
def get_sign(self):
return np.sign(self.significance * self.params)
# Synthetic data example
X = np.random.normal(0, 1, (1000, 10))
X = pd.DataFrame(X, columns = ['A', 'B', 'C', 'D', 'E',
'F', 'G', 'H', 'I', 'J'])
X['C'] = 2*X['F'] + np.random.normal(0, .1, 1000)
X['J'] = -2*X['F'] + np.random.normal(0, .1, 1000)
X['G'] += 1
alpha=.05
test_method='bonferroni'
pc = PartialCorrelation(X.values, alpha, test_method)
# p_values, reject_matrix, sign_matrix = coefficient_test_matrix(X.values, alpha, test_method)
sns.heatmap(pc.get_p_values(), xticklabels=X.columns, yticklabels=X.columns)
plt.show()
sns.heatmap(pc.get_significance(), xticklabels=X.columns, yticklabels=X.columns)
plt.show()
sns.heatmap(pc.get_sign(), xticklabels=X.columns, yticklabels=X.columns)
plt.show()
# Boston housing example
X = pd.DataFrame(load_boston()['data'], columns=load_boston()['feature_names'])
pc = PartialCorrelation(X.values, alpha, test_method)
sns.heatmap(pc.get_p_values(), xticklabels=X.columns, yticklabels=X.columns)
plt.show()
sns.heatmap(pc.get_significance(), xticklabels=X.columns, yticklabels=X.columns)
plt.show()
sns.heatmap(pc.get_sign(), xticklabels=X.columns, yticklabels=X.columns)
plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment