Created
January 11, 2017 14:45
-
-
Save pjankiewicz/b4fca15272317dd963c03145dbfb0b0c to your computer and use it in GitHub Desktop.
Revisions
-
pjankiewicz created this gist
Jan 11, 2017 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,96 @@ import numpy as np from sklearn.base import BaseEstimator, ClassifierMixin, clone from collections import defaultdict class HClassifier(BaseEstimator, ClassifierMixin): ROOT = object() def __init__(self, base_estimator, min_obs=None, max_level=None): self.base_estimator = base_estimator self.min_obs = min_obs self.max_level = max_level self.estimators = {} def fit(self, X, y, **args): y_with_root = self._add_root(y) reverse_index = self._generate_reverse_index(y_with_root) self._make_classifiers(X, y_with_root, reverse_index) return self def predict(self, X): y_hat = [[self.ROOT] for _ in range(X.shape[0])] while True: # check if there are categories in the dictionary # if not stop classes = self._get_last_elems(y_hat) classes_present = set([k for k in classes if k in self.estimators]) if len(classes_present) == 0: break circular = False for cl in set(classes_present): ind = np.where(classes == cl)[0] X_ = X[ind, :] pred = self.estimators[cl].predict(X_) for i, p in zip(ind, pred): y_hat[i].append(p) if len(y_hat[i]) > 20: circular = True if circular: break y_hat_without_root = [y[1:] for y in y_hat] return y_hat_without_root def _add_root(self, y): return [tuple([self.ROOT] + list(k)) for k in y] def _get_last_elems(self, v): # returns last elements of list[list] return np.array([e[-1] for e in v]) def _generate_reverse_index(self, y): # generates an index of where each class appears and on which position reverse_index = defaultdict(list) for obs_i, obs_y in enumerate(y): for i, y_ in enumerate(obs_y[:-1]): if self.max_level and i < self.max_level: reverse_index[y_].append((obs_i, i)) reverse_index = dict(reverse_index.items()) return reverse_index def _make_classifiers(self, X, y, reverse_index, monitor=iter): for root, indices in monitor(reverse_index.items()): ind = np.array([k[0] for k in indices]) X_ = X[ind, :] y_ = np.array([y[i][p + 1] for i, p in indices]) if self.min_obs and len(y_) < self.min_obs: continue if len(set(y_)) == 1: est = ConstantClassifier(y=y_[0]) else: est = clone(self.base_estimator) est.fit(X_, y_) self.estimators[root] = est class ConstantClassifier(BaseEstimator, ClassifierMixin): def __init__(self, y): self.y = y def fit(self, X, y, **args): return self def predict(self, X): return np.repeat(self.y, X.shape[0])