Source code for ex_fuzzy.utils

# -*- coding: utf-8 -*-
"""
Functions that are not fuzzy-specific, but util for some computations. 
Dedicated mostly to compute quantiles for fuzzy partitions.

"""
import numpy as np
import pandas as pd

try:
    from . import fuzzy_sets as fs
    from . import maintenance as mnt
    from . import temporal
    from . import rules
    from . import eval_rules as evr
except ImportError:
    import fuzzy_sets as fs
    import maintenance as mnt
    import temporal
    import rules
    import eval_rules as evr


from sklearn.model_selection import train_test_split


[docs] def quartile_compute(x: np.array) -> list[float]: ''' Computes the quartiles for each feature. :param x: array samples x features :return: list of quartiles for each feature ''' return np.quantile(x, [0, 0.25, 0.5, 1], axis=0)
[docs] def fixed_quantile_compute(x: np.array) -> list[float]: ''' Computes a series of quantiles for each feature in numpy array. Quantiles: [0, 0.20, 0.30, 0.45, 0.55, 0.7, 0.8, 1] :param x: array samples x features :return: list of quantiles for each feature ''' return np.quantile(x, [0, 0.20, 0.30, 0.45, 0.55, 0.7, 0.8, 1], axis=0)
[docs] def partition3_quantile_compute(x: np.array) -> list[float]: ''' Computes a series of quantiles partitioning the variable in 3 cases. Quantiles: [0.00, 0.20, 0.50, 0.80, 1.00] :param x: array samples x features :return: list of quantiles for each feature ''' return np.quantile(x, [0, 0.20, 0.50, 0.80, 1.00], axis=0)
[docs] def t1_simple_partition(x: np.array) -> np.array: ''' Partitions the fuzzy variable in four trapezoidal memberships. :param x: numpy array, vector of shape (samples, ). :return: numpy array, vector of shape (variables, 4, 4). ''' n_partitions = 4 trap_memberships_size = 4 quantile_numbers = fixed_quantile_compute(x) partition_parameters = np.zeros( (x.shape[1], n_partitions, trap_memberships_size)) for partition in range(n_partitions): if partition == 0: partition_parameters[:, partition, 0] = quantile_numbers[0] partition_parameters[:, partition, 1] = quantile_numbers[0] partition_parameters[:, partition, 2] = quantile_numbers[1] partition_parameters[:, partition, 3] = quantile_numbers[2] elif partition == n_partitions - 1: partition_parameters[:, partition, 0] = quantile_numbers[-3] partition_parameters[:, partition, 1] = quantile_numbers[-2] partition_parameters[:, partition, 2] = quantile_numbers[-1] partition_parameters[:, partition, 3] = quantile_numbers[-1] else: pointer = 1 if partition == 1 else 4 partition_parameters[:, partition, 0] = quantile_numbers[pointer] partition_parameters[:, partition, 1] = quantile_numbers[pointer + 1] partition_parameters[:, partition, 2] = quantile_numbers[pointer + 2] partition_parameters[:, partition, 3] = quantile_numbers[pointer + 3] return partition_parameters
[docs] def compute_quantiles(x, n_partitions): ''' Computes the quantiles needed for n-partition fuzzy membership. :param x: numpy array, vector of shape (samples, ). :param n_partitions: int, number of partitions. :return: numpy array, quantiles for partitioning. ''' quantiles = np.linspace(0, 100, n_partitions + 2) return np.nanpercentile(x, quantiles, axis=0)
[docs] def t1_n_partition_parameters(x, n_partitions): ''' Partitions the fuzzy variable in n trapezoidal memberships. :param x: numpy array, matrix of shape (samples, variables). :param n_partitions: int, number of partitions. :return: numpy array, tensor of shape (variables, n_partitions, 4) containing the trapezoidal parameters. ''' trap_memberships_size = 4 n_variables = x.shape[1] quantile_numbers = compute_quantiles(x, n_partitions) # Initialize the array for partition parameters partition_parameters = np.zeros((n_variables, n_partitions, trap_memberships_size)) for partition in range(n_partitions): if partition == 0: # First partition partition_parameters[:, partition, 0] = quantile_numbers[0, :] partition_parameters[:, partition, 1] = quantile_numbers[0, :] partition_parameters[:, partition, 2] = quantile_numbers[1, :] partition_parameters[:, partition, 3] = quantile_numbers[2, :] elif partition == n_partitions - 1: # Last partition partition_parameters[:, partition, 0] = quantile_numbers[partition, :] partition_parameters[:, partition, 1] = quantile_numbers[partition + 1, :] partition_parameters[:, partition, 2] = quantile_numbers[partition + 2, :] partition_parameters[:, partition, 3] = quantile_numbers[partition + 2, :] else: # Intermediate partitions partition_parameters[:, partition, 0] = quantile_numbers[partition, :] partition_parameters[:, partition, 1] = (quantile_numbers[partition, :] + quantile_numbers[partition + 1, :] ) / 2 partition_parameters[:, partition, 2] = (quantile_numbers[partition + 1, :] + quantile_numbers[partition + 2, :] ) / 2 partition_parameters[:, partition, 3] = quantile_numbers[partition + 2, :] return partition_parameters
[docs] def t1_three_partition(x: np.array) -> np.array: ''' Partitions the fuzzy variable in three trapezoidal memberships. :param x: numpy array, vector of shape (samples, ). :return: numpy array, vector of shape (variables, 3, 4). ''' n_partitions = 3 trap_memberships_size = 4 quantile_numbers = partition3_quantile_compute(x) partition_parameters = np.zeros( (x.shape[1], n_partitions, trap_memberships_size)) for partition in range(n_partitions): if partition == 0: partition_parameters[:, partition, 0] = quantile_numbers[0] partition_parameters[:, partition, 1] = quantile_numbers[0] partition_parameters[:, partition, 2] = quantile_numbers[1] partition_parameters[:, partition, 3] = quantile_numbers[2] elif partition == 1: partition_parameters[:, partition, 0] = quantile_numbers[1] partition_parameters[:, partition, 1] = ( quantile_numbers[1] + quantile_numbers[2]) / 2 partition_parameters[:, partition, 2] = ( quantile_numbers[3] + quantile_numbers[2]) / 2 partition_parameters[:, partition, 3] = quantile_numbers[3] else: partition_parameters[:, partition, 0] = quantile_numbers[2] partition_parameters[:, partition, 1] = quantile_numbers[3] partition_parameters[:, partition, 2] = quantile_numbers[4] partition_parameters[:, partition, 3] = quantile_numbers[4] return partition_parameters
[docs] def t2_n_partition_parameters(x, n_partitions): ''' Partitions the fuzzy variable in n trapezoidal memberships. :param x: numpy array, matrix of shape (samples, variables). :param n_partitions: int, number of partitions. :return: numpy array, tensor of shape (variables, n_partitions, 4, 2) containing the trapezoidal parameters. ''' trap_memberships_size = 4 n_variables = x.shape[1] quantile_numbers = compute_quantiles(x, n_partitions) # Initialize the array for partition parameters partition_parameters = np.zeros((n_variables, n_partitions, trap_memberships_size, 2)) for partition in range(n_partitions): if partition == 0: # First partition partition_parameters[:, partition, 0, 1] = quantile_numbers[0] partition_parameters[:, partition, 1, 1] = quantile_numbers[0] partition_parameters[:, partition, 2, 1] = quantile_numbers[1] partition_parameters[:, partition, 3, 1] = quantile_numbers[2] partition_parameters[:, partition, 0, 0] = quantile_numbers[0] partition_parameters[:, partition, 1, 0] = quantile_numbers[0] partition_parameters[:, partition, 2, 0] = quantile_numbers[1] partition_parameters[:, partition, 3, 0] = quantile_numbers[1] + \ 0.9 * (quantile_numbers[2] - quantile_numbers[1]) elif partition == n_partitions - 1: # Last partition partition_parameters[:, partition, 0, 1] = quantile_numbers[partition] partition_parameters[:, partition, 1, 1] = quantile_numbers[partition + 1] partition_parameters[:, partition, 2, 1] = quantile_numbers[partition + 2] partition_parameters[:, partition, 3, 1] = quantile_numbers[partition + 2] partition_parameters[:, partition, 0, 0] = quantile_numbers[partition] + \ 0.1 * (quantile_numbers[partition + 1] - quantile_numbers[partition]) partition_parameters[:, partition, 1, 0] = quantile_numbers[partition + 1] partition_parameters[:, partition, 2, 0] = quantile_numbers[partition + 2] partition_parameters[:, partition, 3, 0] = quantile_numbers[partition + 2] else: # Intermediate partitions partition_parameters[:, partition, 0, 1] = quantile_numbers[partition, :] partition_parameters[:, partition, 1, 1] = (quantile_numbers[partition, :] + quantile_numbers[partition + 1, :] ) / 2 partition_parameters[:, partition, 2, 1] = (quantile_numbers[partition + 1, :] + quantile_numbers[partition + 2, :] ) / 2 partition_parameters[:, partition, 3, 1] = quantile_numbers[partition + 2, :] partition_parameters[:, partition, 0, 0] = quantile_numbers[partition] + \ 0.1 * (quantile_numbers[partition + 1] - quantile_numbers[partition]) partition_parameters[:, partition, 1, 0] = (quantile_numbers[partition, :] + quantile_numbers[partition + 1, :] ) / 2 partition_parameters[:, partition, 2, 0] = (quantile_numbers[partition + 1, :] + quantile_numbers[partition + 2, :] ) / 2 partition_parameters[:, partition, 3, 0] = ((quantile_numbers[partition + 1, :] + quantile_numbers[partition + 2, :] ) / 2) + \ 0.9 * (quantile_numbers[partition + 2] - ((quantile_numbers[partition + 1, :] + quantile_numbers[partition + 2, :] ) / 2)) return partition_parameters
[docs] def t1_simple_triangular_partition_parameters(x: np.array) -> np.array: ''' Partitions the fuzzy variable in three triangular memberships. :param x: numpy array, vector of shape (samples, ). :return: numpy array, vector of shape (variables, 3, 3). ''' n_partitions = 3 trap_memberships_size = 4 #quantile_numbers = partition3_quantile_compute(x) quantile_numbers = np.nanpercentile(x, [0, 25, 50, 75, 100], axis=0) partition_parameters = np.zeros( (x.shape[1], n_partitions, trap_memberships_size)) for partition in range(n_partitions): if partition == 0: partition_parameters[:, partition, 0] = quantile_numbers[0] partition_parameters[:, partition, 1] = quantile_numbers[0] partition_parameters[:, partition, 2] = quantile_numbers[0] partition_parameters[:, partition, 3] = quantile_numbers[2] elif partition == 1: partition_parameters[:, partition, 0] = quantile_numbers[1] partition_parameters[:, partition, 1] = quantile_numbers[2] partition_parameters[:, partition, 2] = quantile_numbers[2] partition_parameters[:, partition, 3] = quantile_numbers[3] else: partition_parameters[:, partition, 0] = quantile_numbers[2] partition_parameters[:, partition, 1] = quantile_numbers[4] partition_parameters[:, partition, 2] = quantile_numbers[4] partition_parameters[:, partition, 3] = quantile_numbers[4] return partition_parameters
[docs] def t1_simple_triangular_partition(x: np.array, n_partitions:int=3) -> list[np.array]: ''' Partitions the dataset features into different fuzzy variables. Parameters are prefixed. Use it for simple testing and initial solution. :param x: numpy array|pandas dataframe, shape samples x features. :return: list of fuzzy variables. ''' partition_parameters = t1_simple_triangular_partition_parameters(x) res = [] for fz_parameter in range(partition_parameters.shape[0]): fzs = [fs.FS(str(ix), partition_parameters[fz_parameter, ix, :], [ np.min(x), np.max(x)]) for ix in range(partition_parameters.shape[1])] res.append(fs.fuzzyVariable(str(fz_parameter), fzs)) return res
[docs] def t1_fuzzy_partitions_dataset(x0: np.array, n_partition=3) -> list[fs.fuzzyVariable]: ''' Partitions the dataset features into different fuzzy variables. Parameters are prefixed. Use it for simple testing and initial solution. :param x: numpy array|pandas dataframe, shape samples x features. :param n_partition: number of partitions to use in the fuzzy variables. :return: list of fuzzy variables. ''' if n_partition == 3: partition_names = ['Low', 'Medium', 'High'] elif n_partition == 5: partition_names = ['Very Low', 'Low', 'Medium', 'High', 'Very High'] else: partition_names = [str(ix) for ix in range(n_partition)] try: fv_names = x0.columns x = x0.values except AttributeError: fv_names = [str(ix) for ix in range(x0.shape[1])] x = x0 mins = np.min(x, axis=0) maxs = np.max(x, axis=0) fz_memberships = t1_n_partition_parameters(x, n_partitions=n_partition) res = [] for fz_parameter in range(fz_memberships.shape[0]): fzs = [fs.FS(partition_names[ix], fz_memberships[fz_parameter, ix, :], [ mins[fz_parameter], maxs[fz_parameter]]) for ix in range(fz_memberships.shape[1])] res.append(fs.fuzzyVariable(fv_names[fz_parameter], fzs)) return res
[docs] def t2_fuzzy_partitions_dataset(x0: np.array, n_partition=3) -> list[fs.fuzzyVariable]: ''' Partitions the dataset features into different fuzzy variables using iv fuzzy sets. Parameters are prefixed. Use it for simple testing and initial solution. :param x: numpy array|pandas dataframe, shape samples x features. :param n_partition: number of partitions to use in the fuzzy variables. :return: list of fuzzy variables. ''' if n_partition == 3: partition_names = ['Low', 'Medium', 'High'] elif n_partition == 5: partition_names = ['Very Low', 'Low', 'Medium', 'High', 'Very High'] else: partition_names = [str(ix) for ix in range(n_partition)] try: fv_names = x0.columns x = x0.values except AttributeError: fv_names = [str(ix) for ix in range(x0.shape[1])] x = x0 mins = np.min(x, axis=0) maxs = np.max(x, axis=0) fz_memberships = t2_n_partition_parameters(x, n_partition) res = [] for fz_parameter in range(fz_memberships.shape[0]): fzs = [fs.IVFS(partition_names[ix], fz_memberships[fz_parameter, ix, :, 0], fz_memberships[fz_parameter, ix, :, 1], [ mins[fz_parameter], maxs[fz_parameter]], lower_height=0.8) for ix in range(fz_memberships.shape[1])] res.append(fs.fuzzyVariable(fv_names[fz_parameter], fzs)) return res
[docs] def gt2_fuzzy_partitions_dataset(x0: np.array, resolution_exp:int=2, n_partition=3) -> list[fs.fuzzyVariable]: ''' Partitions the dataset features into different fuzzy variables using gt2 fuzzy sets. Parameters are prefixed. Use it for simple testing and initial solution. :param x: numpy array|pandas dataframe, shape samples x features. :param resolution_exp: exponent of the resolution of the partition. Default is 2, which means 0.01. (Number of significant decimals) :param n_partition: number of partitions to use in the fuzzy variables. :return: list of fuzzy variables. ''' try: fv_names = x0.columns x = x0.values except AttributeError: fv_names = [str(ix) for ix in range(x0.shape[1])] x = x0 mins = np.min(x, axis=0) maxs = np.max(x, axis=0) iv_simple_partition = t2_fuzzy_partitions_dataset(x, n_partition=n_partition) resolution = 10.0**-np.abs(resolution_exp) res = [] # We iterate through all possible variables for ix_var, fz_var in enumerate(iv_simple_partition): domain_resolution = np.arange( mins[ix_var], maxs[ix_var] + resolution, resolution) fzs = [] for ix_lv, fz_lv in enumerate(fz_var.get_linguistic_variables()): memberships = fz_lv.membership(domain_resolution) fs_domain = {} for ix_z, x in enumerate(domain_resolution): membership_z = memberships[ix_z] fs_domain[x] = fs.FS(str(x), [membership_z[0], np.mean( membership_z), np.mean(membership_z), membership_z[1]], [0.0, 1.0]) fzs.append(fs.GT2(fz_lv.name, fs_domain, [ mins[ix_var], maxs[ix_var]], significant_decimals=np.abs(resolution_exp), unit_resolution=0.01)) res.append(fs.fuzzyVariable(fv_names[ix_var], fzs)) return res
[docs] def construct_partitions(X : np.array, fz_type_studied:fs.FUZZY_SETS=fs.FUZZY_SETS.t1, categorical_mask: np.array=None, n_partitions=3) -> list[fs.fuzzyVariable]: ''' Returns a list of linguistic variables according to the kind of fuzzy specified. :param X: numpy array|pandas dataframe, shape samples x features. :param fz_type_studied: fuzzy set type studied. :param categorial_mask: a boolean mask vector that indicates for each variables if its categorical or not. :param n_partitions: number of partitions to use in the fuzzy set. ''' if mnt.save_usage_flag: mnt.usage_data[mnt.usage_categories.Funcs]['precompute_labels'] += 1 mnt.usage_data[mnt.usage_categories.FuzzySets][fz_type_studied.name] += 1 if isinstance(X, pd.DataFrame): feat_names = X.columns X = X.values else: feat_names = [str(ix) for ix in range(X.shape[1])] # Get the X dataframe without the categorical variables if categorical_mask is not None: X_numerical = X[:, np.logical_not(categorical_mask)] else: X_numerical = X if sum(np.logical_not(categorical_mask)) > 0: if fz_type_studied == fs.FUZZY_SETS.t1: precomputed_partitions = t1_fuzzy_partitions_dataset(X_numerical, n_partitions) elif fz_type_studied == fs.FUZZY_SETS.t2: precomputed_partitions = t2_fuzzy_partitions_dataset(X_numerical, n_partitions) elif fz_type_studied == fs.FUZZY_SETS.gt2: precomputed_partitions = gt2_fuzzy_partitions_dataset(X_numerical, n_partitions) else: raise ValueError('Fuzzy set type not recognized') if categorical_mask is not None: categorical_partition = {} for ix, elem in enumerate(categorical_mask): if elem: if isinstance(X, pd.DataFrame): name = X.columns[ix] else: name = str(ix) cat_var = construct_crisp_categorical_partition(np.array(X)[:, ix], name, fz_type_studied) categorical_partition[name] = cat_var # Reorder the partitions so that they follow the same order as in the original X precomputed_partitions_aux = [] for ix, elem in enumerate(categorical_mask): if isinstance(X, pd.DataFrame): name = X.columns[ix] else: name = str(ix) if elem: precomputed_partitions_aux.append(categorical_partition[name]) else: precomputed_partitions_aux.append(precomputed_partitions.pop(0)) precomputed_partitions = precomputed_partitions_aux for ix, partition in enumerate(precomputed_partitions): partition.name = feat_names[ix] return precomputed_partitions
def _triangular_construct_partitions(X : np.array, fz_type_studied:fs.FUZZY_SETS=fs.FUZZY_SETS.t1, categorical_mask: np.array=None, n_partitions=3) -> list[fs.fuzzyVariable]: ''' Returns a list of linguistic variables according to the kind of fuzzy specified. :param X: numpy array|pandas dataframe, shape samples x features. :param fz_type_studied: fuzzy set type studied. :param categorial_mask: a boolean mask vector that indicates for each variables if its categorical or not. :param n_partitions: number of partitions to use in the fuzzy set. ''' if mnt.save_usage_flag: mnt.usage_data[mnt.usage_categories.Funcs]['precompute_labels'] += 1 mnt.usage_data[mnt.usage_categories.FuzzySets][fz_type_studied.name] += 1 if isinstance(X, pd.DataFrame): feat_names = X.columns X = X.values else: feat_names = [str(ix) for ix in range(X.shape[1])] # Get the X dataframe without the categorical variables if categorical_mask is not None: X_numerical = X[:, np.logical_not(categorical_mask)] else: X_numerical = X if fz_type_studied == fs.FUZZY_SETS.t1: precomputed_partitions = t1_simple_triangular_partition(X_numerical) elif fz_type_studied == fs.FUZZY_SETS.t2: raise NotImplementedError('Triangular partitions not implemented for t2 fuzzy sets') elif fz_type_studied == fs.FUZZY_SETS.gt2: raise NotImplementedError('Triangular partitions not implemented for gt2 fuzzy sets') else: raise ValueError('Fuzzy set type not recognized') if categorical_mask is not None: categorical_partition = {} for ix, elem in enumerate(categorical_mask): if elem: if isinstance(X, pd.DataFrame): name = X.columns[ix] else: name = str(ix) cat_var = construct_crisp_categorical_partition(np.array(X)[:, ix], name, fz_type_studied) categorical_partition[name] = cat_var # Reorder the partitions so that they follow the same order as in the original X precomputed_partitions_aux = [] for ix, elem in enumerate(categorical_mask): if isinstance(X, pd.DataFrame): name = X.columns[ix] else: name = str(ix) if elem: precomputed_partitions_aux.append(categorical_partition[name]) else: precomputed_partitions_aux.append(precomputed_partitions.pop(0)) precomputed_partitions = precomputed_partitions_aux for ix, partition in enumerate(precomputed_partitions): partition.name = feat_names[ix] return precomputed_partitions
[docs] def construct_crisp_categorical_partition(x: np.array, name: str, fz_type_studied: fs.FUZZY_SETS) -> fs.fuzzyVariable: ''' Creates a fuzzy variable for a categorical feature. :param x: array with values of the categorical variable. :param name of the fuzzy variable. :param fz_type_studied: fuzzy set type studied. :return: a fuzzy variable that works as a categorical crips variable (each fuzzy set is 1 exactly on each class value, and 0 on the rest). ''' possible_values = np.unique(x) fuzzy_sets = [] # Create a fuzzy sets for each possible value for ix, value in enumerate(possible_values): if fz_type_studied == fs.FUZZY_SETS.t1: aux = fs.categoricalFS(str(value), value) elif fz_type_studied == fs.FUZZY_SETS.t2 or fz_type_studied == fs.FUZZY_SETS.gt2: aux = fs.categoricalIVFS(str(value), np.unique(x)) fuzzy_sets.append(aux) return fs.fuzzyVariable(name, fuzzy_sets)
[docs] def construct_conditional_frequencies(X: np.array, discrete_time_labels: list[int], initial_ffss: list[fs.FS]): ''' Computes the conditional temporal function for a set of fuzzy sets according to their variation in time. :param X: numpy array, shape samples x features. :param discrete_time_labels: discrete time labels. :param initial_fs: initial fuzzy set list. :return: conditional frequencies. Array shape (time steps, initial fuzzy sets) ''' obs = X.shape[0] discrete_time_labels = np.array(discrete_time_labels) memberships = np.zeros((obs, len(initial_ffss))) for ix, fset in enumerate(initial_ffss): if fset.type() == fs.FUZZY_SETS.t2: memberships[:, ix] = np.mean(fset.membership(X), axis=1) elif fset.type() == fs.FUZZY_SETS.gt2: memberships[:, ix] = np.mean(np.squeeze(fset._alpha_reduction(fset.membership(X))), axis=1) else: memberships[:, ix] = fset.membership(X) max_memberships = np.argmax(memberships, axis=1) res = np.zeros((len(np.unique(discrete_time_labels)), len(initial_ffss))) for time in range(len(np.unique(discrete_time_labels))): relevant_memberships = max_memberships[time == discrete_time_labels] fs_winner_counter = np.zeros(len(initial_ffss)) for ix, fset in enumerate(initial_ffss): fs_winner_counter[ix] = np.sum(relevant_memberships == ix) res[time, :] = fs_winner_counter return res / (np.max(res, axis=0) + 1e-6)
[docs] def classify_temp(dates: pd.DataFrame, cutpoints: tuple[str, str], time: str) -> np.array: ''' Classifies a set of dates according to the temporal cutpoints. Uses {time} as a the time resolution. Returns an array where true values are those values contained between those two date points. :param dates: data observations to cut. :param cutpoints: points to check. :param time: time field to use as the criteria. :return: boolean array. True values are those contained between the cutpoints. ''' def extract_hour(row): return row.__getattribute__(time) hours = pd.to_datetime(dates['date']).apply(extract_hour) cutpoint_series_0 = pd.to_datetime(pd.Series([cutpoints[0]] * len(dates))) cutpoint_series_0.index = dates.index hours0 = cutpoint_series_0.apply(extract_hour) cutpoint_series_1 = pd.to_datetime(pd.Series([cutpoints[1]] * len(dates))) cutpoint_series_1.index = dates.index hours1 = cutpoint_series_1.apply(extract_hour) condicion1 = hours >= hours0 condicion2 = hours <= hours1 return np.array(np.logical_and(condicion1, condicion2))
[docs] def assign_time(a: np.array, observations: list[np.array]) -> int: ''' Assigns a temporal moment to a set of observations. :param a: array of boolean values. :param observations: list of boolean arrays with the corresponding timestamps. :return: the index of the correspondent time moment for the a-th observation. :raises: ValueError if a is not timestamped in any of the observation arrays.''' for ix, obs in enumerate(observations): if obs[a]: return ix raise ValueError('No temporal moment assigned')
[docs] def create_tempVariables(X_train: np.array, time_moments: np.array, precomputed_partitions: list[fs.fuzzyVariable]) -> list[temporal.temporalFS]: ''' Creates a list of temporal fuzzy variables. :param X_train: numpy array, shape samples x features. :param time_moments: time moments. Array shape (samples,). Each value is an integer denoting the n-th time moment of that observation. :param precomputed_partitions: precomputed partitions for each feature. :return: list of temporal fuzzy variables. ''' temp_partitions = [] for ix in range(X_train.shape[1]): feat_conditional = construct_conditional_frequencies(X_train[:, ix], time_moments, initial_ffss=precomputed_partitions[ix]) temp_fs_list = [] for vl in range(feat_conditional.shape[1]): vl_temp_fs = temporal.temporalFS(precomputed_partitions[ix][vl], feat_conditional[:, vl]) temp_fs_list.append(vl_temp_fs) temp_fs_variable = temporal.temporalFuzzyVariable(precomputed_partitions[ix].name, temp_fs_list) temp_partitions.append(temp_fs_variable) return temp_partitions
[docs] def create_multi_tempVariables(X_train: np.array, time_moments: np.array, fuzzy_type: fs.FUZZY_SETS) -> list[list[temporal.temporalFS]]: ''' Creates a of list of lists of temporal fuzzy variables. Each corresponds to a fuzzy partition in a different moment in time. (So, instead of having one vl for all time moments, you have one different for each time moment that represents the same idea) :param X_train: numpy array, shape samples x features. :param time_moments: time moments. Array shape (samples,). Each value is an integer denoting the n-th time moment of that observation. :param precomputed_partitions: precomputed partitions for each feature. :return: list of lists of temporal fuzzy variables. ''' temp_partitions = [] unique_time_moments = np.unique(time_moments) for time in unique_time_moments: X_obs = X_train[time_moments == time, :] precomputed_partitions = construct_partitions(X_obs, fuzzy_type) temp_partitions.append(create_tempVariables(X_obs, time_moments[time_moments == time], precomputed_partitions)) return temp_partitions
[docs] def temporal_cuts(X: pd.DataFrame, cutpoints: list[tuple[str, str]], time_resolution: str='hour') -> list[np.array]: ''' Returns a list of boolean indexes for each temporal moment. Performs the cuts between time steps using the cutpoints list. :param X: data observations to cut in temrporal moments. :param temporal_moments: list of temporal moments to cut. :param cutpoints: list of tuples with the cutpoints for each temporal moment. :param time_resolution: time field to use as the criteria. :return: list of boolean arrays. True values are those contained between the cutpoints in each moment. ''' res = [] for ix, cutpoint in enumerate(cutpoints): observations = classify_temp(X, cutpoint, time=time_resolution) res.append(observations) return res
[docs] def temporal_assemble(X: np.array, y:np.array, temporal_moments: list[np.array]): ''' Assembles the data in the temporal moments in order to have partitions with balanced time moments in each one. :param X: data observations. :param y: labels. :param temporal_moments: list of boolean arrays. True values are those contained between the cutpoints in each moment. :return: tuple of lists of data and labels for each temporal moment. First tuple is: X_train, X_test, y_train, y_test Second tuple is: train temporal moments, test temporal moments. ''' moments_partitions = [] train_temporal_boolean_markers = [] test_temporal_boolean_markers = [] train_counter = 0 test_counter = 0 for ix, temporal_moment in enumerate(temporal_moments): X_train, X_test, y_train, y_test = train_test_split(X[temporal_moment], y[temporal_moment], test_size=0.33, random_state=0) moments_partitions.append((X_train, X_test, y_train, y_test)) if isinstance(X_train,(pd.core.series.Series,pd.DataFrame)): X_train = pd.concat([moments_partitions[ix][0] for ix in range(len(moments_partitions))]) X_test = pd.concat([moments_partitions[ix][1] for ix in range(len(moments_partitions))]) y_train = np.concatenate([moments_partitions[ix][2] for ix in range(len(moments_partitions))]) y_test = np.concatenate([moments_partitions[ix][3] for ix in range(len(moments_partitions))]) else: X_train = np.concatenate([moments_partitions[ix][0] for ix in range(len(moments_partitions))]) X_test = np.concatenate([moments_partitions[ix][1] for ix in range(len(moments_partitions))]) y_train = np.concatenate([moments_partitions[ix][2] for ix in range(len(moments_partitions))]) y_test = np.concatenate([moments_partitions[ix][3] for ix in range(len(moments_partitions))]) for ix, temporal_moment in enumerate(temporal_moments): # Believe, this makes sense to avoid rounding errrors in the size of the final vector _, _, y_train0, y_test0 = train_test_split(X[temporal_moment], y[temporal_moment], test_size=0.33, random_state=0) train_moment_observations = np.zeros((X_train.shape[0])) train_moment_observations[train_counter:train_counter+len(y_train0)] = 1 train_counter += len(y_train0) train_temporal_boolean_markers.append(train_moment_observations) test_moment_observations = np.zeros((X_test.shape[0])) test_moment_observations[test_counter:test_counter+len(y_test0)] = 1 test_counter += len(y_test0) test_temporal_boolean_markers.append(test_moment_observations) return [X_train, X_test, y_train, y_test], [train_temporal_boolean_markers, test_temporal_boolean_markers]
[docs] def extend_fuzzy_sets_enum(new_fuzzy_sets_enum: fs.FUZZY_SETS) -> list[fs.FUZZY_SETS]: ''' Extends the fuzzy sets enum with additional types. :param fuzzy_sets_enum: fuzzy sets enum. :return: extended fuzzy sets enum. ''' import enum NEW_FUZZY_SETS = enum.Enum( "FUZZY_SETS", [(es.name, es.value) for es in fs.FUZZY_SETS] + [(es.name, es.value) for es in new_fuzzy_sets_enum] ) fs.FUZZY_SETS = NEW_FUZZY_SETS
[docs] def mcc_loss(ruleBase: rules.RuleBase, X:np.array, y:np.array, tolerance:float, alpha:float=0.99, beta:float=0.0125, gamma:float=0.0125, precomputed_truth=None) -> float: ''' Fitness function for the optimization problem. Uses only the MCC, ignores the size penalization terms. :param ruleBase: RuleBase object :param X: array of train samples. X shape = (n_samples, n_features) :param y: array of train labels. y shape = (n_samples,) :param tolerance: float. Tolerance for the size evaluation. :param alpha: ignored. :param beta: ignored. :param gamma: ignored. :return: float. Fitness value. ''' ev_object = evr.evalRuleBase(ruleBase, X, y, precomputed_truth=precomputed_truth) ev_object.add_rule_weights() score_acc = ev_object.classification_eval() return score_acc