Source code for heamy.estimator

# coding:utf-8
import hashlib
import inspect
import logging

# NOTE:
# 'getargspec' will be deprecated in future
from inspect import getargspec

import numpy as np

from .cache import Cache, np_hash
from .dataset import Dataset
from .utils.main import concat, tsplit, reshape_1d
from .utils.main import report_score

REQUIRED_ARGS = set(['X_train', 'y_train', 'X_test', 'y_test'])
logger = logging.getLogger('heamy.estimator')


[docs]class BaseEstimator(object): problem = None def __init__(self, dataset, estimator=None, parameters=None, name=None, use_cache=True): """Base class for estimators. This class should not be used directly.""" if estimator is not None: self._estimator = estimator elif hasattr(self.__class__, 'estimator'): self._estimator = self.estimator else: raise ValueError('Missing required estimator object.') if callable(dataset): self.dataset = dataset() else: self.dataset = dataset if (not use_cache) and (not self.dataset.loaded): self.dataset.load() if parameters is not None: self.parameters = parameters else: self.parameters = {} self._hash = None self.probability = False self.use_cache = use_cache self._check_estimator() if name is None: name = '%s(%s)' % (self.estimator_name, self.hash) self._name = name def _check_estimator(self): est = self._estimator self._is_class = isinstance(est, type) if not self._is_class: args = set(getargspec(est).args) if 'self' in args: args.remove('self') if not REQUIRED_ARGS.issubset(args): raise ValueError('Missing required arguments. Please specify %s' % ','.join(REQUIRED_ARGS)) @property def estimator_name(self): if hasattr(self.__class__, 'estimator'): name = self.__class__.__name__ else: name = self._estimator.__name__ return name @property def name(self): return self._name @property def hash(self): if self._hash is None: m = hashlib.new('md5') # generate hash from model's parameters for key in sorted(self.parameters.keys()): h_string = ('%s-%s' % (key, self._convert_parameter(self.parameters[key]))).encode('utf-8') m.update(h_string) m.update(self.estimator_name.encode('utf-8')) m.update(self.dataset.hash.encode('utf-8')) if not self._is_class: m.update(inspect.getsource(self._estimator).encode('utf-8')) self._hash = m.hexdigest() return self._hash def __repr__(self): return self._name def _convert_parameter(self, x): """If callable then return object's source code.""" if callable(x): return inspect.getsource(x) return x def _predict(self, X_train, y_train, X_test, y_test=None): if self._is_class: # class-based definition estimator = self._estimator(**self.parameters) estimator.fit(X_train, y_train) if self.probability: result = estimator.predict_proba(X_test) else: result = estimator.predict(X_test) if self.problem == 'classification' and self.probability: # return second column for binary classification if len(result.shape) == 2 and result.shape[1] == 2: result = result[:, 1] else: # function-based definition result = self._estimator(X_train=X_train, y_train=y_train, X_test=X_test, y_test=y_test, **self.parameters) return result
[docs] def predict(self): if self.use_cache: c = Cache(self.hash, prefix='p') if c.available: logger.info('Loading %s\'s prediction from cache.' % self._name) prediction = c.retrieve('prediction') return prediction elif not self.dataset.loaded: self.dataset.load() prediction = self._predict(X_train=self.dataset.X_train, y_train=self.dataset.y_train, X_test=self.dataset.X_test) if self.use_cache: c.store('prediction', prediction) return prediction
def _dhash(self, params): """Generate hash of the dictionary object.""" m = hashlib.new('md5') m.update(self.hash.encode('utf-8')) for key in sorted(params.keys()): h_string = ('%s-%s' % (key, params[key])).encode('utf-8') m.update(h_string) return m.hexdigest()
[docs] def validate(self, scorer=None, k=1, test_size=0.1, stratify=False, shuffle=True, seed=100, indices=None): """Evaluate score by cross-validation. Parameters ---------- scorer : function(y_true,y_pred), default None Scikit-learn like metric that returns a score. k : int, default 1 The number of folds for validation. If k=1 then randomly split X_train into two parts otherwise use K-fold approach. test_size : float, default 0.1 Size of the test holdout if k=1. stratify : bool, default False shuffle : bool, default True seed : int, default 100 indices : list(np.array,np.array), default None Two numpy arrays that contain indices for train/test slicing. (train_index,test_index) Returns ------- y_true: list Actual labels. y_pred: list Predicted labels. Examples -------- >>> # Custom indices >>> train_index = np.array(range(250)) >>> test_index = np.array(range(250,333)) >>> res = model_rf.validate(mean_absolute_error,indices=(train_index,test_index)) """ if self.use_cache: pdict = {'k': k, 'stratify': stratify, 'shuffle': shuffle, 'seed': seed, 'test_size': test_size} if indices is not None: pdict['train_index'] = np_hash(indices[0]) pdict['test_index'] = np_hash(indices[1]) dhash = self._dhash(pdict) c = Cache(dhash, prefix='v') if c.available: logger.info('Loading %s\'s validation results from cache.' % self._name) elif (self.dataset.X_train is None) and (self.dataset.y_train is None): self.dataset.load() scores = [] y_true = [] y_pred = [] if k == 1: X_train, y_train, X_test, y_test = self.dataset.split(test_size=test_size, stratify=stratify, seed=seed, indices=indices) if self.use_cache and c.available: prediction = c.retrieve('0') else: prediction = self._predict(X_train, y_train, X_test, y_test) if self.use_cache: c.store('0', prediction) if scorer is not None: scores.append(scorer(y_test, prediction)) y_true.append(y_test) y_pred.append(prediction) else: for i, fold in enumerate(self.dataset.kfold(k, stratify=stratify, seed=seed, shuffle=shuffle)): X_train, y_train, X_test, y_test, train_index, test_index = fold if self.use_cache and c.available: prediction = c.retrieve(str(i)) else: prediction = None if prediction is None: logger.info('Calculating %s\'s fold #%s' % (self._name, i + 1)) prediction = self._predict(X_train, y_train, X_test, y_test) if self.use_cache: c.store(str(i), prediction) if scorer is not None: scores.append(scorer(y_test, prediction)) y_true.append(y_test) y_pred.append(prediction) if scorer is not None: report_score(scores, scorer) return y_true, y_pred
[docs] def stack(self, k=5, stratify=False, shuffle=True, seed=100, full_test=True): """Stack a single model. You should rarely be using this method. Use `ModelsPipeline.stack` instead. Parameters ---------- k : int, default 5 stratify : bool, default False shuffle : bool, default True seed : int, default 100 full_test : bool, default True If `True` then evaluate test dataset on the full data otherwise take the mean of every fold. Returns ------- `Dataset` with out of fold predictions. """ train = None test = [] if self.use_cache: pdict = {'k': k, 'stratify': stratify, 'shuffle': shuffle, 'seed': seed, 'full_test': full_test} dhash = self._dhash(pdict) c = Cache(dhash, prefix='s') if c.available: logger.info('Loading %s\'s stack results from cache.' % self._name) train = c.retrieve('train') test = c.retrieve('test') y_train = c.retrieve('y_train') return Dataset(X_train=train, y_train=y_train, X_test=test) elif not self.dataset.loaded: self.dataset.load() for i, fold in enumerate(self.dataset.kfold(k, stratify=stratify, seed=seed, shuffle=shuffle)): X_train, y_train, X_test, y_test, train_index, test_index = fold logger.info('Calculating %s\'s fold #%s' % (self._name, i + 1)) if full_test: prediction = reshape_1d(self._predict(X_train, y_train, X_test, y_test)) else: xt_shape = X_test.shape[0] x_t = concat(X_test, self.dataset.X_test) prediction_concat = reshape_1d(self._predict(X_train, y_train, x_t)) prediction, prediction_test = tsplit(prediction_concat, xt_shape) test.append(prediction_test) if train is None: train = np.zeros((self.dataset.X_train.shape[0], prediction.shape[1])) train[test_index] = prediction if full_test: logger.info('Calculating %s\'s test data' % self._name) test = self._predict(self.dataset.X_train, self.dataset.y_train, self.dataset.X_test) else: test = np.mean(test, axis=0) test = reshape_1d(test) if self.use_cache: c.store('train', train) c.store('test', test) c.store('y_train', self.dataset.y_train) return Dataset(X_train=train, y_train=self.dataset.y_train, X_test=test)
[docs] def blend(self, proportion=0.2, stratify=False, seed=100, indices=None): """Blend a single model. You should rarely be using this method. Use `ModelsPipeline.blend` instead. Parameters ---------- proportion : float, default 0.2 Test size holdout. stratify : bool, default False seed : int, default 100 indices : list(np.ndarray,np.ndarray), default None Two numpy arrays that contain indices for train/test slicing. (train_index,test_index) Returns ------- `Dataset` """ if self.use_cache: pdict = {'proportion': proportion, 'stratify': stratify, 'seed': seed, 'indices': indices} if indices is not None: pdict['train_index'] = np_hash(indices[0]) pdict['test_index'] = np_hash(indices[1]) dhash = self._dhash(pdict) c = Cache(dhash, prefix='b') if c.available: logger.info('Loading %s\'s blend results from cache.' % self._name) train = c.retrieve('train') test = c.retrieve('test') y_train = c.retrieve('y_train') return Dataset(X_train=train, y_train=y_train, X_test=test) elif not self.dataset.loaded: self.dataset.load() X_train, y_train, X_test, y_test = self.dataset.split(test_size=proportion, stratify=stratify, seed=seed, indices=indices) xt_shape = X_test.shape[0] x_t = concat(X_test, self.dataset.X_test) prediction_concat = reshape_1d(self._predict(X_train, y_train, x_t)) new_train, new_test = tsplit(prediction_concat, xt_shape) if self.use_cache: c.store('train', new_train) c.store('test', new_test) c.store('y_train', y_test) return Dataset(new_train, y_test, new_test)
[docs]class Regressor(BaseEstimator): """Wrapper for regression problems. Parameters ---------- dataset : `Dataset` object estimator : a callable scikit-learn like interface, custom function/class, optional parameters : dict, optional Arguments for `estimator` object. name : str, optional The unique name of `Estimator` object. use_cache : bool, optional if `True` then validate/predict/stack/blend results will be cached.""" problem = 'regression'
[docs]class Classifier(BaseEstimator): """Wrapper for classification problems. Parameters ---------- dataset : `Dataset` object estimator : a callable scikit-learn like interface, custom function/class, optional parameters : dict, optional Arguments for `estimator` object. name : str, optional The unique name of `Estimator` object. use_cache : bool, optional if `True` then validate/predict/stack/blend results will be cached.""" problem = 'classification' def __init__(self, dataset, estimator=None, parameters=None, name=None, use_cache=True, probability=True): super(Classifier, self).__init__(dataset=dataset, estimator=estimator, parameters=parameters, name=name, use_cache=use_cache) self.probability = probability