Example code that i used for similar purpose is below. It needs to be refactored and formulated into the probatus API.
from probatus.feature_elimination import ShapRFECV
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
import numpy as np
import pandas as pd
import lightgbm
from scipy.stats import spearmanr
from probatus.stat_tests import DistributionStatistics
from pandas.api.types import is_numeric_dtype
from probatus.feature_elimination import ShapRFECV
from probatus.stat_tests import AutoDist
feature_names = ['f1_missing', 'f2_missing', 'f3_unstable', 'f4_unstable', 'f5_unstable', 'f6_correlated', 'f7_correlated', 'f8_correlated', 'f9', 'f10', 'f11', 'f12', 'f13', 'f14', 'f15', 'f16', 'f17', 'f18', 'f19', 'f20']
# Prepare two samples
X, y = make_classification(n_samples=1000, class_sep=0.05, n_informative=6, n_features=20,
random_state=1, n_redundant=10, n_clusters_per_class=1)
X = pd.DataFrame(X, columns=feature_names)
X['f1_missing'] = X['f1_missing'].apply(lambda x: x if np.random.rand()<0.5 else np.nan)
X['f2_missing'] = X['f2_missing'].apply(lambda x: x if np.random.rand()<0.8 else np.nan)
X['f7_correlated'] = X['f6_correlated'] + 0.5
X['f8_correlated'] = X['f6_correlated'] * 0.5
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=1)
# Introduce shift in certain features in test
X_test['f3_unstable'] = X_test['f3_unstable'] + 2
X_test['f4_unstable'] = X_test['f4_unstable'] + 2
X_test['f5_unstable'] = X_test['f5_unstable'] + 2
def clean_correlation_matrix(corr_matrix: np.ndarray, replaced_value: int=-1) -> np.ndarray:
"""
Overwrite all diagonal and bottom left from diagonal values from 2d correlation matrix with a replaced_value.
Args:
corr_matrix (np.array): 2d matrix with correlation values.
replaced_value (Optional, integer): Value that is imputed, by default -1.
Returns:
(np.array) Correlation matrix
"""
cleaned_corr_matrix = corr_matrix
for i in range(len(cleaned_corr_matrix)):
for j in range(len(cleaned_corr_matrix)):
if i >= j:
cleaned_corr_matrix[i, j] = replaced_value
return cleaned_corr_matrix
def remove_correlated_features(corr_matrix: np.ndarray, correlation_threshold: float, feature_names: List[str]) -> Tuple[List[str], List[str], List[str]]:
"""
Iteratively remove correlated features. It iteratively looks at highest correlated feature pair, and asks user to
select, which feature to keep based on the expert knowledge. The process is continued until all pairs have
correlation below correlation_threshold.
Args:
corr_matrix (np.array): 2D Correlation Matrix.
correlation_threshold (float): Threshold above which correlated features are removed.
feature_names (list of str): list of feature names.
Returns:
(list, list, list) Feature names remaining after removal, selected feature at each iteration of removel,
and features removed at each iteration.
"""
# Take absolute value of correlations
corr_matrix = np.abs(corr_matrix)
# Replace diagonal values
clean_corr_matrix = clean_correlation_matrix(corr_matrix, -1)
# Init variables
selected_features = np.array([])
removed_features = np.array([])
remaining_features = np.array(feature_names)
# Iteratively remove correlated features one by one if there is any correlation above threshold
while np.max(clean_corr_matrix) > correlation_threshold:
position_max_correlated_features = np.unravel_index(clean_corr_matrix.argmax(),
clean_corr_matrix.shape)
print(f'Select one of the following features with correlation '
f'{clean_corr_matrix[position_max_correlated_features]}: \n'
f'0 for {remaining_features[position_max_correlated_features[0]]} \n'
f'1 for {remaining_features[position_max_correlated_features[1]]}')
selected_option = int(input())
removed_option = 1 - selected_option
# Append for tracking results
removed_features = np.append(removed_features,
remaining_features[position_max_correlated_features[removed_option]])
selected_features = np.append(selected_features,
remaining_features[position_max_correlated_features[selected_option]])
print(f'Removing {remaining_features[position_max_correlated_features[removed_option]]}, Num of features left'
f' {len(clean_corr_matrix) - 1}')
# Remove the feature
remaining_features = np.delete(remaining_features, position_max_correlated_features[removed_option])
clean_corr_matrix = np.delete(clean_corr_matrix, position_max_correlated_features[removed_option], 0)
clean_corr_matrix = np.delete(clean_corr_matrix, position_max_correlated_features[removed_option], 1)
return list(remaining_features), list(selected_features), list(removed_features)
corr_matrix_numeric , p_val_matrix_neg_numeric = spearmanr(X_train[feature_names], nan_policy='omit')
remaining_features, selected_features, removed_features = remove_correlated_features(corr_matrix_numeric, 0.95, feature_names)
print(f'Dropping highly correlated features {removed_features}.')
X_train = X_train.drop(columns=removed_features)
X_test = X_test.drop(columns=removed_features)
feature_names = [feature for feature in feature_names if feature not in removed_features]
You can make a class in feature_elimination module. This class would look like this:
__init__(correlation_type='spearman' or 'pearson', correlation_threshold=0.95, **kwargs passed to correlation method)
fit(X, y)
compute(): -> pd.DataFrame
fit_compute(X,y)
plot() # Correlation matrix (possibly before and after)