Source code for heamy.dataset

# coding:utf-8
import hashlib
import inspect
import logging
import numpy as np

import pandas as pd
from scipy.sparse import csr_matrix, csc_matrix

from sklearn.model_selection import train_test_split, StratifiedKFold, KFold
from .cache import Cache, numpy_buffer
from .utils.main import idx, concat

logger = logging.getLogger('heamy.dataset')


[docs]class Dataset(object): """Dataset wrapper. Parameters ---------- X_train : pd.DataFrame or np.ndarray, optional y_train : pd.DataFrame, pd.Series or np.ndarray, optional X_test : pd.DataFrame or np.ndarray, optional y_test : pd.DataFrame, pd.Series or np.ndarray, optional preprocessor : function, optional A callable function that returns preprocessed data. use_cache : bool, default True If `use_cache=True` then preprocessing step will be cached until function code is changed. Examples ---------- >>> # function-based definition >>> from sklearn.datasets import load_boston >>> def boston_dataset(): >>> data = load_boston() >>> X, y = data['data'], data['target'] >>> X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=111) >>> return X_train, y_train, X_test, y_test >>> dataset = Dataset(preprocessor=boston_dataset) >>> # class-based definition >>> class BostonDataset(Dataset): >>> def preprocess(self): >>> data = load_boston() >>> X, y = data['data'], data['target'] >>> X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=111) >>> return X_train, y_train, X_test, y_test """ def __init__(self, X_train=None, y_train=None, X_test=None, y_test=None, preprocessor=None, use_cache=True): self._hash = None self.use_cache = use_cache self._preprocessor = preprocessor self._setup_data(X_train, y_train, X_test, y_test) if self.loaded: self._check_input() self._setup_columns() if hasattr(self.__class__, 'preprocess'): self._preprocessor = self.preprocess def _setup_columns(self): if isinstance(self._X_train, (pd.Series, pd.DataFrame)): self.columns = self.X_train.columns.tolist() else: self.columns = [] def _check_input(self): if not self.loaded: raise ValueError("Missing 2 required arrays: X_train and y_train.") if self._X_train.shape[0] != self._y_train.shape[0]: raise ValueError("Found arrays with inconsistent numbers of samples: X_train(%s), y_train(%s)" % (self._X_train.shape[0], self._y_train.shape[0])) if (self._y_test is not None and self._X_test is not None) and (self._X_test.shape[0] != self._y_test.shape[0]): raise ValueError("Found arrays with inconsistent numbers of samples: X_test(%s), y_test(%s)" % (self._X_test.shape[0], self._y_test.shape[0])) if (self._X_test is not None) and (self._X_train.shape[1] != self._X_test.shape[1]): raise ValueError("Found arrays with inconsistent numbers of features: X_train(%s), X_test(%s)" % (self._X_train.shape[1], self._X_test.shape[1]))
[docs] def load(self): if self.loaded: raise ValueError("Dataset is already loaded.") if callable(self._preprocessor): if not self._load_cache(): data = self._preprocessor() if isinstance(data, (list, tuple)): self._setup_data(*data) elif isinstance(data, dict): self._setup_data(**data) self._check_input() self._setup_columns() if self.use_cache: self._cache()
def _load_cache(self): if self.use_cache: cache = Cache(self.hash, prefix='d') if cache.available: logger.info('Loading %s from cache.' % (self.__repr__())) self._X_train = cache.retrieve('X_train') self._y_train = cache.retrieve('y_train') self._X_test = cache.retrieve('X_test') self._y_test = cache.retrieve('y_test') return True return False def _cache(self): if callable(self._preprocessor): cache = Cache(self.hash, prefix='d') cache.store('X_train', self._X_train) cache.store('y_train', self._y_train) if self._X_test is not None: cache.store('X_test', self._X_test) if self._y_test is not None: cache.store('y_test', self._y_test) return True else: logger.warning("%s can't be cached." % self.__repr__()) return False @property def name(self): if hasattr(self.__class__, 'preprocess') or self._preprocessor is None: name = self.__class__.__name__ else: name = self._preprocessor.__name__ return name def __repr__(self): return '%s(%s)' % (self.name, self.hash) def _setup_data(self, X_train=None, y_train=None, X_test=None, y_test=None): self._X_train = self._validate_data(X_train) self._X_test = self._validate_data(X_test) self._y_train = self._validate_data(y_train, only_numpy=True) self._y_test = self._validate_data(y_test, only_numpy=True) def _validate_data(self, data, only_numpy=False): if not only_numpy: dtypes = (pd.Series, pd.DataFrame, np.ndarray) else: dtypes = (np.ndarray,) if not isinstance(data, dtypes) and (data is not None): data = np.array(data) return data
[docs] def split(self, test_size=0.1, stratify=False, inplace=False, seed=33, indices=None): """Splits train set into two parts (train/test). Parameters ---------- test_size : float, default 0.1 stratify : bool, default False inplace : bool, default False If `True` then dataset's train/test sets will be replaced with new data. seed : int, default 33 indices : list(np.ndarray, np.ndarray), default None Two numpy arrays that contain indices for train/test slicing. Returns ------- X_train : np.ndarray y_train : np.ndarray X_test : np.ndarray y_test : np.ndarray Examples -------- >>> train_index = np.array(range(250)) >>> test_index = np.array(range(250,333)) >>> res = dataset.split(indices=(train_index,test_index)) >>> res = dataset.split(test_size=0.3,seed=1111) """ if not self.loaded: self.load() if stratify: stratify = self.y_train else: stratify = None if indices is None: X_train, X_test, y_train, y_test = train_test_split(self.X_train, self._y_train, test_size=test_size, random_state=seed, stratify=stratify, ) else: X_train, y_train = idx(self.X_train, indices[0]), self.y_train[indices[0]] X_test, y_test = idx(self.X_train, indices[1]), self.y_train[indices[1]] if inplace: self._X_train, self._X_test, self._y_train, self._y_test = X_train, X_test, y_train, y_test return X_train, y_train, X_test, y_test
[docs] def kfold(self, k=5, stratify=False, shuffle=True, seed=33): """K-Folds cross validation iterator. Parameters ---------- k : int, default 5 stratify : bool, default False shuffle : bool, default True seed : int, default 33 Yields ------- X_train, y_train, X_test, y_test, train_index, test_index """ if stratify: kf = StratifiedKFold(n_splits=k, random_state=seed, shuffle=shuffle) else: kf = KFold(n_splits=k, random_state=seed, shuffle=shuffle) for train_index, test_index in kf.split(self.X_train, self.y_train): X_train, y_train = idx(self.X_train, train_index), self.y_train[train_index] X_test, y_test = idx(self.X_train, test_index), self.y_train[test_index] yield X_train, y_train, X_test, y_test, train_index, test_index
@property def X_train(self): return self._X_train @property def y_train(self): return self._y_train @property def X_test(self): return self._X_test @property def y_test(self): return self._y_test @property def loaded(self): return (self._X_train is not None) and (self._y_train is not None) @property def hash(self): """Return md5 hash for current dataset.""" if self._hash is None: m = hashlib.new('md5') if self._preprocessor is None: # generate hash from numpy array m.update(numpy_buffer(self._X_train)) m.update(numpy_buffer(self._y_train)) if self._X_test is not None: m.update(numpy_buffer(self._X_test)) if self._y_test is not None: m.update(numpy_buffer(self._y_test)) elif callable(self._preprocessor): # generate hash from user defined object (source code) m.update(inspect.getsource(self._preprocessor).encode('utf-8')) self._hash = m.hexdigest() return self._hash
[docs] def merge(self, ds, inplace=False, axis=1): """Merge two datasets. Parameters ---------- axis : {0,1} ds : `Dataset` inplace : bool, default False Returns ------- `Dataset` """ if not isinstance(ds, Dataset): raise ValueError('Expected `Dataset`, got %s.' % ds) X_train = concat(ds.X_train, self.X_train, axis=axis) y_train = concat(ds.y_train, self.y_train, axis=axis) if ds.X_test is not None: X_test = concat(ds.X_test, self.X_test, axis=axis) else: X_test = None if ds.y_test is not None: y_test = concat(ds.y_test, self.y_test, axis=axis) else: y_test = None if inplace: self._X_train = X_train self._y_train = y_train if X_test is not None: self._X_test = X_test if y_test is not None: self._y_test = y_test return None return Dataset(X_train, y_train, X_test, y_test)
[docs] def to_csc(self): """Convert Dataset to scipy's Compressed Sparse Column matrix.""" self._X_train = csc_matrix(self._X_train) self._X_test = csc_matrix(self._X_test)
[docs] def to_csr(self): """Convert Dataset to scipy's Compressed Sparse Row matrix.""" self._X_train = csr_matrix(self._X_train) self._X_test = csr_matrix(self._X_test)
[docs] def to_dense(self): """Convert sparse Dataset to dense matrix.""" if hasattr(self._X_train, 'todense'): self._X_train = self._X_train.todense() self._X_test = self._X_test.todense()