Source code for ex_fuzzy.evolutionary_fit

"""
This is a the source file that contains the class to train/fit the rulebase using a genetic algorithm.

"""
import numpy as np
import pandas as pd

from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import matthews_corrcoef
from sklearn.base import ClassifierMixin
from pymoo.algorithms.soo.nonconvex.ga import GA
from pymoo.core.problem import Problem
from pymoo.optimize import minimize
from pymoo.operators.sampling.rnd import IntegerRandomSampling
from pymoo.operators.crossover.sbx import SBX
from pymoo.operators.mutation.pm import PolynomialMutation
from pymoo.core.variable import Integer
from multiprocessing.pool import ThreadPool
from pymoo.core.problem import StarmapParallelization

try:
    from . import fuzzy_sets as fs
    from . import rules
    from . import eval_rules as evr
    from . import vis_rules
    from . import maintenance as mnt
except ImportError:
    import fuzzy_sets as fs
    import rules
    import eval_rules as evr
    import vis_rules
    import maintenance as mnt



[docs] class BaseFuzzyRulesClassifier(ClassifierMixin): ''' Class that is used as a classifier for a fuzzy rule based system. Supports precomputed and optimization of the linguistic variables. ''' def __init__(self, nRules: int = 30, nAnts: int = 4, fuzzy_type: fs.FUZZY_SETS = fs.FUZZY_SETS.t1, tolerance: float = 0.0, class_names: list[str] = None, n_linguistic_variables: list[int]|int = 3, verbose=False, linguistic_variables: list[fs.fuzzyVariable] = None, domain: list[float] = None, n_class: int=None, precomputed_rules: rules.MasterRuleBase=None, runner: int=1, ds_mode: int = 0, fuzzy_modifiers:bool=False, allow_unknown:bool=False) -> None: ''' Inits the optimizer with the corresponding parameters. :param nRules: number of rules to optimize. :param nAnts: max number of antecedents to use. :param fuzzy type: FUZZY_SET enum type in fuzzy_sets module. The kind of fuzzy set used. :param tolerance: tolerance for the dominance score of the rules. :param n_linguist_variables: number of linguistic variables per antecedent. :param verbose: if True, prints the progress of the optimization. :param linguistic_variables: list of fuzzyVariables type. If None (default) the optimization process will init+optimize them. :param domain: list of the limits for each variable. If None (default) the classifier will compute them empirically. :param n_class: names of the classes in the problem. If None (default) the classifier will compute it empirically. :param precomputed_rules: MasterRuleBase object. If not None, the classifier will use the rules in the object and ignore the conflicting parameters. :param runner: number of threads to use. If None (default) the classifier will use 1 thread. :param ds_mode: mode for the dominance score. 0: normal dominance score, 1: rules without weights, 2: weights optimized for each rule based on the data. :param fuzzy_modifiers: if True, the classifier will use the modifiers in the optimization process. :param allow_unknown: if True, the classifier will allow the unknown class in the classification process. (Which would be a -1 value) ''' if mnt.save_usage_flag: mnt.usage_data[mnt.usage_categories.Funcs]['fit'] += 1 if precomputed_rules is not None: self.nRules = len(precomputed_rules.get_rules()) self.nAnts = len(precomputed_rules.get_rules()[0].antecedents) self.n_class = len(precomputed_rules) self.nclasses_ = len(precomputed_rules.consequent_names) self.classes_names = precomputed_rules.consequent_names self.rule_base = precomputed_rules else: self.nRules = nRules self.nAnts = nAnts self.nclasses_ = n_class if not (class_names is None): if isinstance(class_names, np.ndarray): self.classes_names = list(class_names) else: self.classes_names = class_names else: self.classes_names = class_names self.custom_loss = None self.verbose = verbose self.tolerance = tolerance self.ds_mode = ds_mode self.fuzzy_modifiers = fuzzy_modifiers self.allow_unknown = allow_unknown if runner > 1: pool = ThreadPool(runner) self.thread_runner = StarmapParallelization(pool.starmap) else: self.thread_runner = None if linguistic_variables is not None: if mnt.save_usage_flag: mnt.usage_data[mnt.usage_categories.Funcs]['precompute_labels'] += 1 # If the linguistic variables are precomputed then we act accordingly self.lvs = linguistic_variables self.n_linguist_variables = [len(lv.linguistic_variable_names()) for lv in self.lvs] self.domain = None self.fuzzy_type = self.lvs[0].fuzzy_type() if self.nAnts > len(linguistic_variables): self.nAnts = len(linguistic_variables) if verbose: print('Warning: The number of antecedents is higher than the number of variables. Setting nAnts to the number of linguistic variables. (' + str(len(linguistic_variables)) + ')') else: if mnt.save_usage_flag: mnt.usage_data[mnt.usage_categories.Funcs]['opt_labels'] += 1 # If not, then we need the parameters sumistered by the user. self.lvs = None self.fuzzy_type = fuzzy_type self.n_linguist_variables = n_linguistic_variables self.domain = domain self.alpha_ = 0.0 self.beta_ = 0.0
[docs] def customized_loss(self, loss_function): ''' Function to customize the loss function used for the optimization. :param loss_function: function that takes as input the true labels and the predicted labels and returns a float. :return: None ''' self.custom_loss = loss_function
[docs] def fit(self, X: np.array, y: np.array, n_gen:int=70, pop_size:int=30, checkpoints:int=0, candidate_rules:rules.MasterRuleBase=None, initial_rules:rules.MasterRuleBase=None, random_state:int=33, var_prob:float=0.3, sbx_eta:float=3.0, mutation_eta=7.0, tournament_size=3, bootstrap_size=1000, p_value_compute=False) -> None: ''' Fits a fuzzy rule based classifier using a genetic algorithm to the given data. :param X: numpy array samples x features :param y: labels. integer array samples (x 1) :param n_gen: integer. Number of generations to run the genetic algorithm. :param pop_size: integer. Population size for each gneration. :param checkpoints: integer. Number of checkpoints to save the best rulebase found so far. :param candidate_rules: if these rules exist, the optimization process will choose the best rules from this set. If None (default) the rules will be generated from scratch. :param initial_rules: if these rules exist, the optimization process will start from this set. If None (default) the rules will be generated from scratch. :param random_state: integer. Random seed for the optimization process. :param var_prob: float. Probability of crossover for the genetic algorithm. :param sbx_eta: float. Eta parameter for the SBX crossover. :param mutation_eta: float. Eta parameter for the polynomial mutation. :param tournament_size: integer. Size of the tournament for the genetic algorithm. :return: None. The classifier is fitted to the data. ''' if mnt.save_usage_flag: mnt.usage_data[mnt.usage_categories.Funcs]['fit'] += 1 if self.classes_names is None: self.classes_names = [aux for aux in np.unique(y)] if self.nclasses_ is None: self.nclasses_ = len(self.classes_names) if isinstance(y[0], str): y = np.array([self.classes_names.index(str(aux)) for aux in y]) if candidate_rules is None: if initial_rules is not None: self.fuzzy_type = initial_rules.fuzzy_type() self.n_linguist_variables = initial_rules.n_linguistic_variables() self.domain = [fv.domain for fv in initial_rules[0].antecedents] self.nRules = len(initial_rules.get_rules()) self.nAnts = len(initial_rules.get_rules()[0].antecedents) if self.lvs is None: # Check if self.n_linguist_variables is a list or a single value. if isinstance(self.n_linguist_variables, int): self.n_linguist_variables = [self.n_linguist_variables for _ in range(X.shape[1])] if self.nAnts > X.shape[1]: self.nAnts = X.shape[1] if self.verbose: print('Warning: The number of antecedents is higher than the number of variables. Setting nAnts to the number of variables. (' + str(X.shape[1]) + ')') # If Fuzzy variables need to be optimized. problem = FitRuleBase(X, y, nRules=self.nRules, nAnts=self.nAnts, tolerance=self.tolerance, n_classes=len(np.unique(y)), n_linguistic_variables=self.n_linguist_variables, fuzzy_type=self.fuzzy_type, domain=self.domain, thread_runner=self.thread_runner, alpha=self.alpha_, beta=self.beta_, ds_mode=self.ds_mode, encode_mods=self.fuzzy_modifiers, allow_unknown=self.allow_unknown) else: # If Fuzzy variables are already precomputed. problem = FitRuleBase(X, y, nRules=self.nRules, nAnts=self.nAnts, n_classes=len(np.unique(y)), linguistic_variables=self.lvs, domain=self.domain, tolerance=self.tolerance, thread_runner=self.thread_runner, alpha=self.alpha_, beta=self.beta_, ds_mode=self.ds_mode, encode_mods=self.fuzzy_modifiers, allow_unknown=self.allow_unknown) else: self.fuzzy_type = candidate_rules.fuzzy_type() self.n_linguist_variables = candidate_rules.n_linguistic_variables() problem = ExploreRuleBases(X, y, n_classes=len(np.unique(y)), candidate_rules=candidate_rules, thread_runner=self.thread_runner, nRules=self.nRules) if self.custom_loss is not None: problem.fitness_func = self.custom_loss if initial_rules is not None: rules_gene = problem.encode_rulebase(initial_rules, self.lvs is None) rules_gene = (np.ones((pop_size, len(rules_gene))) * rules_gene).astype(int) else: rules_gene = IntegerRandomSampling() algorithm = GA( pop_size=pop_size, crossover=SBX(prob=var_prob, eta=sbx_eta), mutation=PolynomialMutation(eta=mutation_eta), tournament_size=tournament_size, sampling=rules_gene, eliminate_duplicates=False) if checkpoints > 0: if self.verbose: print('=================================================') print('n_gen | n_eval | f_avg | f_min ') print('=================================================') algorithm.setup(problem, seed=random_state, termination=('n_gen', n_gen)) for k in range(n_gen): algorithm.next() res = algorithm if self.verbose: print('%-6s | %-8s | %-8s | %-8s' % (res.n_gen, res.evaluator.n_eval, res.pop.get('F').mean(), res.pop.get('F').min())) if k % checkpoints == 0: with open("checkpoint_" + str(algorithm.n_gen), "w") as f: pop = algorithm.pop fitness_last_gen = pop.get('F') best_solution_arg = np.argmin(fitness_last_gen) best_individual = pop.get('X')[best_solution_arg, :] rule_base = problem._construct_ruleBase( best_individual, self.fuzzy_type) eval_performance = evr.evalRuleBase( rule_base, np.array(X), y) eval_performance.add_full_evaluation() # self.rename_fuzzy_variables() This wont work on checkpoints! rule_base.purge_rules(self.tolerance) rule_base.rename_cons(self.classes_names) checkpoint_rules = rule_base.print_rules(True) f.write(checkpoint_rules) else: res = minimize(problem, algorithm, # termination, ("n_gen", n_gen), seed=random_state, copy_algorithm=False, save_history=False, verbose=self.verbose) pop = res.pop fitness_last_gen = pop.get('F') best_solution = np.argmin(fitness_last_gen) best_individual = pop.get('X')[best_solution, :] self.performance = 1 - fitness_last_gen[best_solution] try: self.var_names = list(X.columns) self.X = X.values except AttributeError: self.X = X self.var_names = [str(ix) for ix in range(X.shape[1])] self.rule_base = problem._construct_ruleBase( best_individual, self.fuzzy_type) self.eval_performance = evr.evalRuleBase( self.rule_base, np.array(X), y) self.eval_performance.add_full_evaluation() self.rule_base.purge_rules(self.tolerance) self.eval_performance.add_full_evaluation() # After purging the bad rules we update the metrics. if p_value_compute: self.p_value_validation() self.rule_base.rename_cons(self.classes_names) if self.lvs is None: self.rename_fuzzy_variables()
[docs] def p_value_validation(self, bootstrap_size:int=100): ''' Computes the permutation and bootstrapping p-values for the classifier and its rules. :param bootstrap_size: integer. Number of bootstraps samples to use. ''' self.p_value_class_structure, self.p_value_feature_coalitions = self.eval_performance.p_permutation_classifier_validation() self.eval_performance.p_bootstrapping_rules_validation(bootstrap_size)
[docs] def load_master_rule_base(self, rule_base: rules.MasterRuleBase) -> None: ''' Loads a master rule base to be used in the prediction process. :param rule_base: ruleBase object. :return: None ''' self.rule_base = rule_base self.nRules = len(rule_base.get_rules()) self.nAnts = len(rule_base.get_rules()[0].antecedents) self.nclasses_ = len(rule_base)
[docs] def forward(self, X: np.array, out_class_names=False) -> np.array: ''' Returns the predicted class for each sample. :param X: np array samples x features. :param out_class_names: if True, the output will be the class names instead of the class index. :return: np array samples (x 1) with the predicted class. ''' try: X = X.values # If X was a pandas dataframe except AttributeError: pass return self.rule_base.winning_rule_predict(X, out_class_names=out_class_names)
[docs] def predict(self, X: np.array, out_class_names=False) -> np.array: ''' Returns the predicted class for each sample. :param X: np array samples x features. :param out_class_names: if True, the output will be the class names instead of the class index. :return: np array samples (x 1) with the predicted class. ''' return self.forward(X, out_class_names=out_class_names)
[docs] def predict_proba(self, X: np.array) -> np.array: ''' Returns the predicted class probabilities for each sample. :param X: np array samples x features. :return: np array samples x classes with the predicted class probabilities. ''' try: X = X.values # If X was a pandas dataframe except AttributeError: pass return self.rule_base.compute_association_degrees(X)
[docs] def print_rules(self, return_rules:bool=False) -> None: ''' Print the rules contained in the fitted rulebase. ''' return self.rule_base.print_rules(return_rules)
[docs] def plot_fuzzy_variables(self) -> None: ''' Plot the fuzzy partitions in each fuzzy variable. ''' fuzzy_variables = self.rule_base.rule_bases[0].antecedents for ix, fv in enumerate(fuzzy_variables): vis_rules.plot_fuzzy_variable(fv)
[docs] def rename_fuzzy_variables(self) -> None: ''' Renames the linguist labels so that high, low and so on are consistent. It does so usually after an optimization process. :return: None. Names are sorted accorded to the central point of the fuzzy memberships. ''' for ix in range(len(self.rule_base)): fuzzy_variables = self.rule_base.rule_bases[ix].antecedents for jx, fv in enumerate(fuzzy_variables): new_order_values = [] possible_names = FitRuleBase.vl_names[self.n_linguist_variables[jx]] for zx, fuzzy_set in enumerate(fv.linguistic_variables): studied_fz = fuzzy_set.type() if studied_fz == fs.FUZZY_SETS.temporal: studied_fz = fuzzy_set.inside_type() if studied_fz == fs.FUZZY_SETS.t1: f1 = np.mean( fuzzy_set.membership_parameters[0] + fuzzy_set.membership_parameters[1]) elif (studied_fz == fs.FUZZY_SETS.t2): f1 = np.mean( fuzzy_set.secondMF_upper[0] + fuzzy_set.secondMF_upper[1]) elif studied_fz == fs.FUZZY_SETS.gt2: sec_memberships = fuzzy_set.secondary_memberships.values() f1 = float(list(fuzzy_set.secondary_memberships.keys())[np.argmax( [fzm.membership_parameters[2] for ix, fzm in enumerate(sec_memberships)])]) new_order_values.append(f1) new_order = np.argsort(np.array(new_order_values)) fuzzy_sets_vl = fv.linguistic_variables for jx, x in enumerate(new_order): fuzzy_sets_vl[x].name = possible_names[jx]
[docs] def get_rulebase(self) -> list[np.array]: ''' Get the rulebase obtained after fitting the classifier to the data. :return: a matrix format for the rulebase. ''' return self.rule_base.get_rulebase_matrix()
[docs] def reparametrice_loss(self, alpha:float, beta:float) -> None: ''' Changes the parameters in the loss function. :note: Does not check for convexity preservation. The user can play with these parameters as it wills. :param alpha: controls the MCC term. :param beta: controls the average rule size loss. ''' self.alpha_ = alpha self.beta_ = beta
def __call__(self, X:np.array) -> np.array: ''' Returns the predicted class for each sample. :param X: np array samples x features. :return: np array samples (x 1) with the predicted class. ''' return self.predict(X)
[docs] class ExploreRuleBases(Problem): ''' Class to model as pymoo problem the fitting of a rulebase to a set of data given a series of candidate rules for a classification problem using Evolutionary strategies Supports type 1 and t2. ''' def __init__(self, X: np.array, y: np.array, nRules: int, n_classes: int, candidate_rules: rules.MasterRuleBase, thread_runner: StarmapParallelization=None, tolerance:float = 0.01) -> None: ''' Cosntructor method. Initializes the classifier with the number of antecedents, linguist variables and the kind of fuzzy set desired. :param X: np array or pandas dataframe samples x features. :param y: np vector containing the target classes. vector sample :param n_class: number of classes in the problem. If None (as default) it will be computed from the data. :param cancidate_rules: MasterRuleBase object. If not None, the classifier will use the rules in the object and ignore the conflicting parameters. ''' try: self.var_names = list(X.columns) self.X = X.values except AttributeError: self.X = X self.var_names = [str(ix) for ix in range(X.shape[1])] self.tolerance = tolerance self.fuzzy_type = candidate_rules.fuzzy_type() self.y = y self.nCons = 1 # This is fixed to MISO rules. self.n_classes = n_classes self.candidate_rules = candidate_rules self.nRules = nRules self._precomputed_truth = rules.compute_antecedents_memberships(candidate_rules.get_antecedents(), X) self.fuzzy_type = self.candidate_rules[0].antecedents[0].fuzzy_type() self.min_bounds = np.min(self.X, axis=0) self.max_bounds = np.max(self.X, axis=0) nTotalRules = len(self.candidate_rules.get_rules()) # Each var is using or not a rule. vars = {ix: Integer(bounds=[0, nTotalRules - 1]) for ix in range(self.nRules)} varbound = np.array([[0, nTotalRules- 1]] * self.nRules) nVar = len(vars.keys()) if thread_runner is not None: super().__init__( vars=vars, n_var=nVar, n_obj=1, elementwise=True, vtype=int, xl=varbound[:, 0], xu=varbound[:, 1], elementwise_runner=thread_runner) else: super().__init__( vars=vars, n_var=nVar, n_obj=1, elementwise=True, vtype=int, xl=varbound[:, 0], xu=varbound[:, 1]) def _construct_ruleBase(self, x: np.array, fuzzy_type: fs.FUZZY_SETS, ds_mode:int=0, allow_unknown:bool=False) -> rules.MasterRuleBase: ''' Creates a valid rulebase from the given subject and the candidate rules. :param x: gen of a rulebase. type: dict. :param fuzzy_type: FUZZY_SET enum type in fuzzy_sets module. The kind of fuzzy set used. :param ds_mode: int. Mode for the dominance score. 0: normal dominance score, 1: rules without weights, 2: weights optimized for each rule based on the data. :param allow_unknown: if True, the classifier will allow the unknown class in the classification process. (Which would be a -1 value) :return: a Master rulebase object. ''' x = x.astype(int) # Get all rules and their consequents diff_consequents = np.arange(len(self.candidate_rules)) # Choose the selected ones in the gen total_rules = self.candidate_rules.get_rules() chosen_rules = [total_rules[ix] for ix, val in enumerate(x)] rule_consequents = sum([[ix] * len(rule) for ix, rule in enumerate(self.candidate_rules)], []) chosen_rules_consequents = [rule_consequents[val] for ix, val in enumerate(x)] # Create a rule base for each consequent with the selected rules rule_list = [[] for _ in range(self.n_classes)] rule_bases = [] for ix, consequent in enumerate(diff_consequents): for rx, rule in enumerate(chosen_rules): if chosen_rules_consequents[rx] == consequent: rule_list[ix].append(rule) if len(rule_list[ix]) > 0: if fuzzy_type == fs.FUZZY_SETS.t1: rule_base_cons = rules.RuleBaseT1( self.candidate_rules[0].antecedents, rule_list[ix]) elif fuzzy_type == fs.FUZZY_SETS.t2: rule_base_cons = rules.RuleBaseT2( self.candidate_rules[0].antecedents, rule_list[ix]) elif fuzzy_type == fs.FUZZY_SETS.gt2: rule_base_cons = rules.RuleBaseGT2( self.candidate_rules[0].antecedents, rule_list[ix]) rule_bases.append(rule_base_cons) # Create the Master Rule Base object with the individual rule bases newMasterRuleBase = rules.MasterRuleBase(rule_bases, diff_consequents, ds_mode=ds_mode, allow_unknown=allow_unknown) return newMasterRuleBase def _evaluate(self, x: np.array, out: dict, *args, **kwargs): ''' :param x: array of train samples. x shape = features those features are the parameters to optimize. :param out: dict where the F field is the fitness. It is used from the outside. ''' try: ruleBase = self._construct_ruleBase(x, self.fuzzy_type) score = self.fitness_func(ruleBase, self.X, self.y, self.tolerance, precomputed_truth=self._precomputed_truth) out["F"] = 1 - score except rules.RuleError: out["F"] = 1
[docs] def fitness_func(self, ruleBase: rules.RuleBase, X:np.array, y:np.array, tolerance:float, alpha:float=0.0, beta:float=0.0, precomputed_truth=None) -> float: ''' Fitness function for the optimization problem. :param ruleBase: RuleBase object :param X: array of train samples. X shape = (n_samples, n_features) :param y: array of train labels. y shape = (n_samples,) :param tolerance: float. Tolerance for the size evaluation. :return: float. Fitness value. ''' ev_object = evr.evalRuleBase(ruleBase, X, y, precomputed_truth=precomputed_truth) ev_object.add_rule_weights() score_acc = ev_object.classification_eval() score_rules_size = ev_object.size_antecedents_eval(tolerance) score_nrules = ev_object.effective_rulesize_eval(tolerance) score = score_acc + score_rules_size * alpha + score_nrules * beta return score
[docs] class FitRuleBase(Problem): ''' Class to model as pymoo problem the fitting of a rulebase for a classification problem using Evolutionary strategies. Supports type 1 and iv fs (iv-type 2) ''' def _init_optimize_vl(self, fuzzy_type: fs.FUZZY_SETS, n_linguist_variables: int, domain: list[(float, float)] = None): ''' Inits the corresponding fields if no linguistic partitions were given. :param fuzzy type: FUZZY_SET enum type in fuzzy_sets module. The kind of fuzzy set used. :param n_linguistic_variables: number of linguistic variables per antecedent. :param domain: list of the limits for each variable. If None (default) the classifier will compute them empirically. ''' self.lvs = None self.vl_names = [FitRuleBase.vl_names[n_linguist_variables[nn]] if n_linguist_variables[nn] < 6 else list(map(str, np.arange(nn))) for nn in range(len(n_linguist_variables))] self.fuzzy_type = fuzzy_type self.n_lv_possible = n_linguist_variables self.domain = domain self._precomputed_truth = None def _init_precomputed_vl(self, linguist_variables: list[fs.fuzzyVariable], X: np.array): ''' Inits the corresponding fields if linguistic partitions for each variable are given. :param linguistic_variables: list of fuzzyVariables type. :param X: np array samples x features. ''' self.lvs = linguist_variables self.vl_names = [lv.linguistic_variable_names() for lv in self.lvs] self.n_lv_possible = [len(lv.linguistic_variable_names()) for lv in self.lvs] self.fuzzy_type = self.lvs[0].fs_type self.domain = None self._precomputed_truth = rules.compute_antecedents_memberships(linguist_variables, X) vl_names = [ # Linguistic variable names prenamed for some specific cases. [], [], ['Low', 'High'], ['Low', 'Medium', 'High'], ['Low', 'Medium', 'High', 'Very High'], ['Very Low', 'Low', 'Medium', 'High', 'Very High'] ] def __init__(self, X: np.array, y: np.array, nRules: int, nAnts: int, n_classes: int, thread_runner: StarmapParallelization=None, linguistic_variables:list[fs.fuzzyVariable]=None, n_linguistic_variables:int=3, fuzzy_type=fs.FUZZY_SETS.t1, domain:list=None, tolerance:float=0.01, alpha:float=0.0, beta:float=0.0, ds_mode: int =0, encode_mods: bool=False, allow_unknown:bool=False) -> None: ''' Cosntructor method. Initializes the classifier with the number of antecedents, linguist variables and the kind of fuzzy set desired. :param X: np array or pandas dataframe samples x features. :param y: np vector containing the target classes. vector sample :param nRules: number of rules to optimize. :param nAnts: max number of antecedents to use. :param n_class: number of classes in the problem. If None (as default) it will be computed from the data. :param linguistic_variables: list of linguistic variables precomputed. If given, the rest of conflicting arguments are ignored. :param n_linguistic_variables: number of linguistic variables per antecedent. :param fuzzy_type: Define the fuzzy set or fuzzy set extension used as linguistic variable. :param domain: list with the upper and lower domains of each input variable. If None (as default) it will stablish the empirical min/max as the limits. :param tolerance: float. Tolerance for the size evaluation. :param alpha: float. Weight for the rulebase size term in the fitness function. (Penalizes number of rules) :param beta: float. Weight for the average rule size term in the fitness function. :param ds_mode: int. Mode for the dominance score. 0: normal dominance score, 1: rules without weights, 2: weights optimized for each rule based on the data. :param encode_mods: bool. If True, the optimization process will include the modifiers for the membership functions. :param allow_unknown: if True, the classifier will allow the unknown class in the classification process. (Which would be a -1 value) ''' try: self.var_names = list(X.columns) self.X = X.values except AttributeError: self.X = X self.var_names = [str(ix) for ix in range(X.shape[1])] try: self.tolerance = tolerance except KeyError: self.tolerance = 0.001 self.y = y self.classes_names = np.unique(y) self.nRules = nRules self.nAnts = nAnts self.nCons = 1 # This is fixed to MISO rules. self.ds_mode = ds_mode self.encode_mods = encode_mods self.allow_unknown = allow_unknown if n_classes is not None: self.n_classes = n_classes else: self.n_classes = len(np.unique(y)) if linguistic_variables is not None: self._init_precomputed_vl(linguistic_variables, X) else: if isinstance(n_linguistic_variables, int): n_linguistic_variables = [n_linguistic_variables] * self.X.shape[1] self._init_optimize_vl( fuzzy_type, n_linguistic_variables) if self.domain is None: # If all the variables are numerical, then we can compute the min/max of the domain. if np.all([np.issubdtype(self.X[:, ix].dtype, np.number) for ix in range(self.X.shape[1])]): self.min_bounds = np.min(self.X, axis=0) self.max_bounds = np.max(self.X, axis=0) else: self.min_bounds = np.zeros(self.X.shape[1]) self.max_bounds = np.zeros(self.X.shape[1]) for ix in range(self.X.shape[1]): if np.issubdtype(self.X[:, ix].dtype, np.number): self.min_bounds[ix] = np.min(self.X[:, ix]) self.max_bounds[ix] = np.max(self.X[:, ix]) else: self.min_bounds[ix] = 0 self.max_bounds[ix] = len(np.unique(self.X[:, ix])) else: self.min_bounds, self.max_bounds = self.domain self.antecedents_referencial = [np.linspace( self.min_bounds[ix], self.max_bounds[ix], 100) for ix in range(self.X.shape[1])] possible_antecedent_bounds = np.array( [[0, self.X.shape[1] - 1]] * self.nAnts * self.nRules) vl_antecedent_bounds = np.array( [[-1, self.n_lv_possible[ax] - 1] for ax in range(self.nAnts)] * self.nRules) # -1 means not caring antecedent_bounds = np.concatenate( (possible_antecedent_bounds, vl_antecedent_bounds)) vars_antecedent = {ix: Integer( bounds=antecedent_bounds[ix]) for ix in range(len(antecedent_bounds))} aux_counter = len(vars_antecedent) if self.lvs is None: self.feature_domain_bounds = np.array( [[0, 99] for ix in range(self.X.shape[1])]) size_multiplier = 4 if self.fuzzy_type == fs.FUZZY_SETS.t1 else 8 membership_bounds = np.concatenate( [[self.feature_domain_bounds[ixx]] * size_multiplier * self.n_lv_possible[ixx] for ixx in range(len(self.n_lv_possible))]) vars_memberships = { aux_counter + ix: Integer(bounds=membership_bounds[ix]) for ix in range(len(membership_bounds))} aux_counter += len(vars_memberships) final_consequent_bounds = np.array( [[-1, self.n_classes - 1]] * self.nRules) vars_consequent = {aux_counter + ix: Integer( bounds=final_consequent_bounds[ix]) for ix in range(len(final_consequent_bounds))} if self.lvs is None: vars = {key: val for d in [ vars_antecedent, vars_memberships, vars_consequent] for key, val in d.items()} varbound = np.concatenate( (antecedent_bounds, membership_bounds, final_consequent_bounds), axis=0) else: vars = {key: val for d in [vars_antecedent, vars_consequent] for key, val in d.items()} varbound = np.concatenate( (antecedent_bounds, final_consequent_bounds), axis=0) if self.ds_mode == 2: weights_bounds = np.array([[0, 99] for ix in range(self.nRules)]) vars_weights = {max(vars.keys()) + 1 + ix: Integer( bounds=weights_bounds[ix]) for ix in range(len(weights_bounds))} vars = {key: val for d in [vars, vars_weights] for key, val in d.items()} varbound = np.concatenate((varbound, weights_bounds), axis=0) if encode_mods: # Now we add modifiers exponents for the membership functions. rule_mods = np.array([[0, len(rules.modifiers_names.keys()) - 1]] * self.nAnts * self.nRules) vars_modifiers = {max(vars.keys()) + 1 + ix: Integer( bounds=rule_mods[ix]) for ix in range(len(rule_mods))} vars = {key: val for d in [vars, vars_modifiers] for key, val in d.items()} varbound = np.concatenate((varbound, rule_mods), axis=0) nVar = len(varbound) self.single_gen_size = nVar self.alpha_ = alpha self.beta_ = beta if thread_runner is not None: super().__init__( vars=vars, n_var=nVar, n_obj=1, elementwise=True, vtype=int, xl=varbound[:, 0], xu=varbound[:, 1], elementwise_runner=thread_runner) else: super().__init__( vars=vars, n_var=nVar, n_obj=1, elementwise=True, vtype=int, xl=varbound[:, 0], xu=varbound[:, 1])
[docs] def encode_rulebase(self, rule_base: rules.MasterRuleBase, optimize_lv: bool, encode_mods:bool=False) -> np.array: ''' Given a rule base, constructs the corresponding gene associated with that rule base. GENE STRUCTURE First: antecedents chosen by each rule. Size: nAnts * nRules (index of the antecedent) Second: Variable linguistics used. Size: nAnts * nRules Third: Parameters for the fuzzy partitions of the chosen variables. Size: nAnts * self.n_linguistic_variables * 8|4 (2 trapezoidal memberships if t2) Four: Consequent classes. Size: nRules :param rule_base: rule base object. :param optimize_lv: if True, the gene is prepared to optimize the membership functions. :param encode_mods: if True, the gene is prepared to encode the modifiers for the membership functions. :return: np array of size self.single_gen_size. ''' gene = np.zeros((self.single_gen_size,)) n_lv_possible = len(rule_base.rule_bases[0].antecedents[0].linguistic_variables) fuzzy_type = rule_base.fuzzy_type() rule_consequents = rule_base.get_consequents() nreal_rules = len(rule_consequents) mf_size = 4 if fuzzy_type == fs.FUZZY_SETS.t1 else 8 # Pointer to the fourth section of the gene: consequents if optimize_lv: # If lv memberships are optimized. fourth_pointer = 2 * self.nAnts * self.nRules + \ len(rule_base.antecedents) * n_lv_possible * mf_size else: # If no memberships are optimized. fourth_pointer = 2 * self.nAnts * self.nRules # Pointer to the fifth section of the gene: weights (if they exist) fifth_pointer = fourth_pointer + self.nRules if rule_base.ds_mode == 2: for ix, rule in enumerate(rule_base.get_rules()): gene[fifth_pointer + ix] = rule.weight # Last pointer to the gene: modifiers for the membership functions if encode_mods: if rule_base.ds_mode == 2: sixth_pointer = fifth_pointer + rule_base.get_rules() else: sixth_pointer = fifth_pointer for ix, rule in enumerate(rule_base.get_rules()): for jx, modifier in enumerate(rule.modifiers): mod_idx = list(rules.modifiers_names.keys()).index(modifier) gene[sixth_pointer + ix * self.nAnts + jx] = mod_idx # First and second sections of the gene: antecedents and linguistic variables for i0, rule in enumerate(rule_base.get_rules()): # Reconstruct the rules first_pointer = i0 * self.nAnts second_pointer = (self.nRules * self.nAnts) + i0 * self.nAnts for ax, linguistic_variable in enumerate(rule.antecedents): gene[first_pointer + ax] = ax gene[second_pointer + ax] = linguistic_variable # Update the fourth section of the gene: consequents using the fourth pointer gene[fourth_pointer + i0] = rule_consequents[i0] # Fill the rest of the rules with don't care values nvoid_rules = self.nRules - nreal_rules for vx in range(nvoid_rules): first_pointer = nreal_rules * self.nAnts + vx * self.nAnts second_pointer = (self.nRules * self.nAnts) + nreal_rules * self.nAnts + vx * self.nAnts for ax, linguistic_variable in enumerate(rule.antecedents): gene[first_pointer + ax] = ax gene[second_pointer + ax] = -1 # Update the fourth section of the gene: consequents using the fourth pointer gene[fourth_pointer + nreal_rules + vx] = -1 if optimize_lv: # If lv memberships are optimized. third_pointer = 2 * self.nAnts * self.nRules aux_pointer = 0 for ix, fuzzy_variable in enumerate(rule_base.get_antecedents()): for linguistic_variable in range(n_lv_possible): fz_parameters = fuzzy_variable[linguistic_variable].membership_parameters for jx, fz_parameter in enumerate(fz_parameters): closest_idx = (np.abs(np.asarray(self.antecedents_referencial[ix]) - fz_parameter)).argmin() gene[third_pointer + aux_pointer] = closest_idx aux_pointer += 1 return np.array(list(map(int, gene)))
def _construct_ruleBase(self, x: np.array, fuzzy_type: fs.FUZZY_SETS, **kwargs) -> rules.MasterRuleBase: ''' Given a subject, it creates a rulebase according to its specification. :param x: gen of a rulebase. type: dict. :param fuzzy_type: a enum type. Check fuzzy_sets for complete specification (two fields, t1 and t2, to mark which fs you want to use) :param kwargs: additional parameters to pass to the rule :return: a rulebase object. kwargs: - time_moment: if temporal fuzzy sets are used with different partitions for each time interval, then this parameter is used to specify which time moment is being used. ''' rule_list = [[] for _ in range(self.n_classes)] mf_size = 4 if fuzzy_type == fs.FUZZY_SETS.t1 else 8 ''' GEN STRUCTURE First: antecedents chosen by each rule. Size: nAnts * nRules Second: Variable linguistics used. Size: nAnts * nRules Third: Parameters for the fuzzy partitions of the chosen variables. Size: X.shape[1] * self.n_linguistic_variables * 8|4 (2 trapezoidal memberships if t2) Four: Consequent classes. Size: nRules Five: Weights for each rule. Size: nRules (only if ds_mode == 2) ''' if self.lvs is None: # If memberships are optimized. fourth_pointer = 2 * self.nAnts * self.nRules + \ sum(self.n_lv_possible) * mf_size else: # If no memberships are optimized. fourth_pointer = 2 * self.nAnts * self.nRules if self.ds_mode == 2: fifth_pointer = fourth_pointer + self.nRules else: fifth_pointer = fourth_pointer if self.ds_mode == 2: sixth_pointer = fifth_pointer + self.nRules else: sixth_pointer = fifth_pointer aux_pointer = 0 min_domain = np.min(self.X, axis=0) max_domain = np.max(self.X, axis=0) # Integer sampling doesnt work fine in pymoo, so we do this (which is btw what pymoo is really doing if you just set integer optimization) try: # subject might come as a dict. x = np.array(list(x.values())).astype(int) except AttributeError: x = x.astype(int) for i0 in range(self.nRules): # Reconstruct the rules first_pointer = i0 * self.nAnts chosen_ants = x[first_pointer:first_pointer + self.nAnts] second_pointer = (i0 * self.nAnts) + (self.nAnts * self.nRules) # Shape: self.nAnts + self.n_lv_possible + 1 antecedent_parameters = x[second_pointer:second_pointer+self.nAnts] init_rule_antecedents = np.zeros( (self.X.shape[1],)) - 1 # -1 is dont care for jx, ant in enumerate(chosen_ants): if self.lvs is not None: antecedent_parameters[jx] = min(antecedent_parameters[jx], len(self.lvs[ant]) - 1) else: antecedent_parameters[jx] = min(antecedent_parameters[jx], self.n_lv_possible[ant] - 1) init_rule_antecedents[ant] = antecedent_parameters[jx] consequent_idx = x[fourth_pointer + aux_pointer] assert consequent_idx < self.n_classes, "Consequent class is not valid. Something in the gene is wrong." aux_pointer += 1 if self.ds_mode == 2: rule_weight = x[fifth_pointer + i0] / 100 else: rule_weight = 1.0 # Last pointer to the gene: modifiers for the membership functions if self.encode_mods: #for jx, modifier in enumerate(rule.modifiers): rule_modifiers = np.ones((len(self.lvs),)) * -1 idx_mods = x[sixth_pointer + i0 * self.nAnts: sixth_pointer + (i0+1)*self.nAnts] for jx, ant in enumerate(chosen_ants): rule_modifiers[ant] = list(rules.modifiers_names.keys())[idx_mods[jx]] else: rule_modifiers = None if consequent_idx != -1 and np.any(init_rule_antecedents != -1): rs_instance = rules.RuleSimple(init_rule_antecedents, 0, rule_modifiers) if self.ds_mode == 1 or self.ds_mode == 2: rs_instance.weight = rule_weight rule_list[consequent_idx].append( rs_instance) # If we optimize the membership functions if self.lvs is None: third_pointer = 2 * self.nAnts * self.nRules aux_pointer = 0 antecedents = [] for fuzzy_variable in range(self.X.shape[1]): linguistic_variables = [] for linguistic_variable in range(self.n_lv_possible[fuzzy_variable]): parameter_pointer = third_pointer + aux_pointer fz_parameters_idx = x[parameter_pointer:parameter_pointer + mf_size] fz_parameters = self.antecedents_referencial[fuzzy_variable][fz_parameters_idx] aux_pointer += mf_size if fuzzy_type == fs.FUZZY_SETS.t2: fz_parameters[0:6] = np.sort(fz_parameters[0:6]) mu = [np.min(fz_parameters[0:2]), fz_parameters[2], fz_parameters[3], np.max(fz_parameters[4:6])] ml = [np.max(fz_parameters[0:2]), fz_parameters[2], fz_parameters[3], np.min(fz_parameters[4:6])] height = fz_parameters[6] / np.max(fz_parameters) ivfs = fs.IVFS(self.vl_names[fuzzy_variable][linguistic_variable], ml, mu, (min_domain[fuzzy_variable], max_domain[fuzzy_variable]), lower_height=height) else: ivfs = fs.FS(self.vl_names[fuzzy_variable][linguistic_variable], np.sort(fz_parameters[0:4]), (min_domain[fuzzy_variable], max_domain[fuzzy_variable])) linguistic_variables.append(ivfs) antecedents.append(fs.fuzzyVariable( self.var_names[fuzzy_variable], linguistic_variables)) else: try: antecedents = self.lvs[kwargs['time_moment']] except: antecedents = self.lvs for i in range(self.n_classes): if fuzzy_type == fs.FUZZY_SETS.temporal: fuzzy_type = self.lvs[0][0].inside_type() if fuzzy_type == fs.FUZZY_SETS.t1: rule_base = rules.RuleBaseT1(antecedents, rule_list[i]) elif fuzzy_type == fs.FUZZY_SETS.t2: rule_base = rules.RuleBaseT2(antecedents, rule_list[i]) elif fuzzy_type == fs.FUZZY_SETS.gt2: rule_base = rules.RuleBaseGT2(antecedents, rule_list[i]) if i == 0: res = rules.MasterRuleBase([rule_base], self.classes_names, ds_mode=self.ds_mode, allow_unknown=self.allow_unknown) else: res.add_rule_base(rule_base) res.rename_cons(self.classes_names) return res def _evaluate(self, x: np.array, out: dict, *args, **kwargs): ''' :param x: array of train samples. x shape = features those features are the parameters to optimize. :param out: dict where the F field is the fitness. It is used from the outside. ''' ruleBase = self._construct_ruleBase(x, self.fuzzy_type) if len(ruleBase.get_rules()) > 0: score = self.fitness_func(ruleBase, self.X, self.y, self.tolerance, self.alpha_, self.beta_, self._precomputed_truth) else: score = 0.0 out["F"] = 1 - score
[docs] def fitness_func(self, ruleBase: rules.RuleBase, X:np.array, y:np.array, tolerance:float, alpha:float=0.0, beta:float=0.0, precomputed_truth:np.array=None) -> float: ''' Fitness function for the optimization problem. :param ruleBase: RuleBase object :param X: array of train samples. X shape = (n_samples, n_features) :param y: array of train labels. y shape = (n_samples,) :param tolerance: float. Tolerance for the size evaluation. :param alpha: float. Weight for the accuracy term. :param beta: float. Weight for the average rule size term. :param precomputed_truth: np array. If given, it will be used as the truth values for the evaluation. :return: float. Fitness value. ''' if precomputed_truth is None: precomputed_truth = rules.compute_antecedents_memberships(ruleBase.antecedents, X) ev_object = evr.evalRuleBase(ruleBase, X, y, precomputed_truth=precomputed_truth) ev_object.add_full_evaluation() ruleBase.purge_rules(tolerance) if len(ruleBase.get_rules()) > 0: score_acc = ev_object.classification_eval() score_rules_size = ev_object.size_antecedents_eval(tolerance) score_nrules = ev_object.effective_rulesize_eval(tolerance) score = score_acc + score_rules_size * alpha + score_nrules * beta else: score = 0.0 return score