"""
Evolutionary Optimization for Fuzzy Rule Base Learning
This module implements genetic algorithm-based optimization for learning fuzzy rule bases.
It provides automatic rule discovery, parameter tuning, and structure optimization for
fuzzy inference systems using evolutionary computation techniques.
Main Components:
- FitRuleBase: Core optimization problem class for genetic algorithms
- Fitness functions: Multiple objective functions for rule quality assessment
- Genetic operators: Specialized crossover, mutation, and selection for fuzzy rules
- Multi-objective optimization: Support for accuracy vs. complexity trade-offs
- Parallel evaluation: Efficient fitness evaluation using multiple threads
- Integration with Pymoo: Leverages the Pymoo optimization framework
The module supports automatic learning of:
- Rule antecedents (which variables and linguistic terms to use)
- Rule consequents (output class assignments)
- Rule structure (number of rules, complexity constraints)
- Membership function parameters (when combined with other modules)
Key Features:
- Stratified cross-validation for robust fitness evaluation
- Multiple fitness metrics (accuracy, MCC, F1-score, etc.)
- Support for Type-1, Type-2, and General Type-2 fuzzy systems
- Automatic handling of imbalanced datasets
- Configurable complexity penalties to avoid overfitting
"""
import os
from typing import Callable
import numpy as np
import pandas as pd
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import matthews_corrcoef
from sklearn.base import ClassifierMixin
from multiprocessing.pool import ThreadPool
from pymoo.core.problem import Problem
from pymoo.core.variable import Integer
from pymoo.parallelization.starmap import StarmapParallelization
from sklearn.metrics import matthews_corrcoef
# Import backend abstraction
try:
from . import evolutionary_backends as ev_backends
from . import fuzzy_sets as fs
from . import rules
from . import eval_rules as evr
from . import vis_rules
from .evolutionary_search import ExploreRuleBases
except ImportError:
import evolutionary_backends as ev_backends
import fuzzy_sets as fs
import rules
import eval_rules as evr
import vis_rules
from evolutionary_search import ExploreRuleBases
[docs]
class BaseFuzzyRulesClassifier(ClassifierMixin):
'''
Class that is used as a classifier for a fuzzy rule based system. Supports precomputed and optimization of the linguistic variables.
'''
[docs]
def __init__(self, nRules: int = 30, nAnts: int = 4, fuzzy_type: fs.FUZZY_SETS = fs.FUZZY_SETS.t1, tolerance: float = 0.0, class_names: list[str] = None,
n_linguistic_variables: list[int]|int = 3, verbose=False, linguistic_variables: list[fs.fuzzyVariable] = None, categorical_mask: list[int] = None,
domain: list[float] = None, n_class: int=None, precomputed_rules: rules.MasterRuleBase=None, runner: int=1, ds_mode: int = 0, allow_unknown:bool=False, backend: str='pymoo') -> None:
'''
Inits the optimizer with the corresponding parameters.
:param nRules: number of rules to optimize.
:param nAnts: max number of antecedents to use.
:param fuzzy type: FUZZY_SET enum type in fuzzy_sets module. The kind of fuzzy set used.
:param tolerance: tolerance for the dominance score of the rules.
:param n_linguist_variables: number of linguistic variables per antecedent.
:param verbose: if True, prints the progress of the optimization.
:param linguistic_variables: list of fuzzyVariables type. If None (default) the optimization process will init+optimize them.
:param domain: list of the limits for each variable. If None (default) the classifier will compute them empirically.
:param n_class: names of the classes in the problem. If None (default) the classifier will compute it empirically.
:param precomputed_rules: MasterRuleBase object. If not None, the classifier will use the rules in the object and ignore the conflicting parameters.
:param runner: number of threads to use. If None (default) the classifier will use 1 thread.
:param ds_mode: mode for the dominance score. 0: normal dominance score, 1: rules without weights, 2: weights optimized for each rule based on the data.
:param allow_unknown: if True, the classifier will allow the unknown class in the classification process. (Which would be a -1 value)
:param backend: evolutionary backend to use. Options: 'pymoo' (default, CPU) or 'evox' (GPU-accelerated). Install with: pip install ex-fuzzy[evox]
'''
if precomputed_rules is not None:
self.nRules = len(precomputed_rules.get_rules())
self.nAnts = len(precomputed_rules.get_rules()[0].antecedents)
self.n_class = len(precomputed_rules)
self.nclasses_ = len(precomputed_rules.consequent_names)
self.classes_names = precomputed_rules.consequent_names
self.rule_base = precomputed_rules
else:
self.nRules = nRules
self.nAnts = nAnts
self.nclasses_ = n_class
if not (class_names is None):
if isinstance(class_names, np.ndarray):
self.classes_names = list(class_names)
else:
self.classes_names = class_names
else:
self.classes_names = class_names
self.categorical_mask = categorical_mask
self.custom_loss = None
self.verbose = verbose
self.tolerance = tolerance
self.ds_mode = ds_mode
self.allow_unknown = allow_unknown
# Initialize evolutionary backend
try:
self.backend = ev_backends.get_backend(backend)
if verbose:
print(f"Using evolutionary backend: {self.backend.name()}")
except ValueError as e:
if verbose:
print(f"Warning: {e}. Falling back to pymoo backend.")
self.backend = ev_backends.get_backend('pymoo')
if runner > 1:
pool = ThreadPool(runner)
self.thread_runner = StarmapParallelization(pool.starmap)
else:
self.thread_runner = None
if linguistic_variables is not None:
# If the linguistic variables are precomputed then we act accordingly
self.lvs = linguistic_variables
self.n_linguist_variables = [len(lv.linguistic_variable_names()) for lv in self.lvs]
self.domain = None
self.fuzzy_type = self.lvs[0].fuzzy_type()
if self.nAnts > len(linguistic_variables):
self.nAnts = len(linguistic_variables)
if verbose:
print('Warning: The number of antecedents is higher than the number of variables. Setting nAnts to the number of linguistic variables. (' + str(len(linguistic_variables)) + ')')
else:
# If not, then we need the parameters sumistered by the user.
self.lvs = None
self.fuzzy_type = fuzzy_type
self.n_linguist_variables = n_linguistic_variables
self.domain = domain
self.alpha_ = 0.0
self.beta_ = 0.0
[docs]
def customized_loss(self, loss_function):
'''
Function to customize the loss function used for the optimization.
:param loss_function: function that takes as input the true labels and the predicted labels and returns a float.
:return: None
'''
self.custom_loss = loss_function
[docs]
def fit(self, X: np.array, y: np.array, n_gen:int=70, pop_size:int=30,
checkpoints:int=0, candidate_rules:rules.MasterRuleBase=None, initial_rules:rules.MasterRuleBase=None, random_state:int=33,
var_prob:float=0.3, sbx_eta:float=3.0, mutation_eta:float=7.0, tournament_size:int=3, bootstrap_size:int=1000, checkpoint_path:str='',
p_value_compute:bool=False, checkpoint_callback: Callable[[int, rules.MasterRuleBase], None] = None) -> None:
'''
Fits a fuzzy rule based classifier using a genetic algorithm to the given data.
:param X: numpy array samples x features
:param y: labels. integer array samples (x 1)
:param n_gen: integer. Number of generations to run the genetic algorithm.
:param pop_size: integer. Population size for each gneration.
:param checkpoints: integer. Number of checkpoints to save the best rulebase found so far.
:param candidate_rules: if these rules exist, the optimization process will choose the best rules from this set. If None (default) the rules will be generated from scratch.
:param initial_rules: if these rules exist, the optimization process will start from this set. If None (default) the rules will be generated from scratch.
:param random_state: integer. Random seed for the optimization process.
:param var_prob: float. Probability of crossover for the genetic algorithm.
:param sbx_eta: float. Eta parameter for the SBX crossover.
:param checkpoint_path: string. Path to save the checkpoints. If None (default) the checkpoints will be saved in the current directory.
:param mutation_eta: float. Eta parameter for the polynomial mutation.
:param tournament_size: integer. Size of the tournament for the genetic algorithm.
:param checkpoint_callback: function. Callback function that get executed at each checkpoint ('checkpoints' must be greater than 0), its arguments are the generation number and the rule_base of the checkpoint.
:return: None. The classifier is fitted to the data.
'''
if isinstance(X, pd.DataFrame):
lvs_names = list(X.columns)
X = X.values
else:
lvs_names = [str(ix) for ix in range(X.shape[1])]
if self.classes_names is None:
self.classes_names = [aux for aux in np.unique(y)]
if self.nclasses_ is None:
self.nclasses_ = len(self.classes_names)
if isinstance(np.array(y)[0], str):
y = np.array([self.classes_names.index(str(aux)) for aux in y])
if candidate_rules is None:
if initial_rules is not None:
self.fuzzy_type = initial_rules.fuzzy_type()
self.n_linguist_variables = initial_rules.n_linguistic_variables()
self.domain = [fv.domain for fv in initial_rules[0].antecedents]
self.nRules = len(initial_rules.get_rules())
self.nAnts = len(initial_rules.get_rules()[0].antecedents)
if self.lvs is None:
# Check if self.n_linguist_variables is a list or a single value.
if isinstance(self.n_linguist_variables, int):
self.n_linguist_variables = [self.n_linguist_variables for _ in range(X.shape[1])]
if self.nAnts > X.shape[1]:
self.nAnts = X.shape[1]
if self.verbose:
print('Warning: The number of antecedents is higher than the number of variables. Setting nAnts to the number of variables. (' + str(X.shape[1]) + ')')
# If Fuzzy variables need to be optimized.
problem = FitRuleBase(X, y, nRules=self.nRules, nAnts=self.nAnts, tolerance=self.tolerance, n_classes=len(np.unique(y)),
n_linguistic_variables=self.n_linguist_variables, fuzzy_type=self.fuzzy_type, domain=self.domain, thread_runner=self.thread_runner,
alpha=self.alpha_, beta=self.beta_, ds_mode=self.ds_mode, categorical_mask=self.categorical_mask,
allow_unknown=self.allow_unknown, backend_name=self.backend.name())
else:
# If Fuzzy variables are already precomputed.
problem = FitRuleBase(X, y, nRules=self.nRules, nAnts=self.nAnts, n_classes=len(np.unique(y)),
linguistic_variables=self.lvs, domain=self.domain, tolerance=self.tolerance, thread_runner=self.thread_runner,
alpha=self.alpha_, beta=self.beta_, ds_mode=self.ds_mode,
allow_unknown=self.allow_unknown, backend_name=self.backend.name())
else:
self.fuzzy_type = candidate_rules.fuzzy_type()
self.n_linguist_variables = candidate_rules.n_linguistic_variables()
problem = ExploreRuleBases(X, y, n_classes=len(np.unique(y)), candidate_rules=candidate_rules, thread_runner=self.thread_runner, nRules=self.nRules)
if self.custom_loss is not None:
problem.fitness_func = self.custom_loss
# Prepare initial population
if initial_rules is None:
rules_gene = None # Will use default random sampling
else:
rules_gene = problem.encode_rulebase(initial_rules, self.lvs is None)
rules_gene = (np.ones((pop_size, len(rules_gene))) * rules_gene).astype(int)
# Use backend for optimization
if checkpoints > 0:
# Checkpoint mode - delegate to backend if supported
if self.backend.name() == 'pymoo':
# Define checkpoint handler
def handle_checkpoint(gen: int, best_individual: np.array):
rule_base = problem._construct_ruleBase(best_individual, self.fuzzy_type)
eval_performance = evr.evalRuleBase(rule_base, np.array(X), y)
eval_performance.add_full_evaluation()
rule_base.purge_rules(self.tolerance)
rule_base.rename_cons(self.classes_names)
checkpoint_rules = rule_base.print_rules(True, bootstrap_results=True)
if checkpoint_callback is None:
with open(os.path.join(checkpoint_path, "checkpoint_" + str(gen)), "w") as f:
f.write(checkpoint_rules)
else:
checkpoint_callback(gen, rule_base)
# Call backend's checkpoint optimization
result = self.backend.optimize_with_checkpoints(
problem=problem,
n_gen=n_gen,
pop_size=pop_size,
random_state=random_state,
verbose=self.verbose,
checkpoint_freq=checkpoints,
checkpoint_callback=handle_checkpoint,
var_prob=var_prob,
sbx_eta=sbx_eta,
mutation_eta=mutation_eta,
tournament_size=tournament_size,
sampling=rules_gene
)
best_individual = result['X']
self.performance = 1 - result['F']
else:
# EvoX or other backends: checkpoints not supported
if self.verbose:
print(f"Warning: Checkpoints are not yet supported with {self.backend.name()} backend. Running without checkpoints.")
result = self.backend.optimize(
problem=problem,
n_gen=n_gen,
pop_size=pop_size,
random_state=random_state,
verbose=self.verbose,
var_prob=var_prob,
sbx_eta=sbx_eta,
mutation_eta=mutation_eta,
tournament_size=tournament_size,
sampling=rules_gene
)
best_individual = result['X']
self.performance = 1 - result['F']
else:
# Normal optimization without checkpoints
result = self.backend.optimize(
problem=problem,
n_gen=n_gen,
pop_size=pop_size,
random_state=random_state,
verbose=self.verbose,
var_prob=var_prob,
sbx_eta=sbx_eta,
mutation_eta=mutation_eta,
tournament_size=tournament_size,
sampling=rules_gene
)
best_individual = result['X']
self.performance = 1 - result['F']
try:
self.var_names = list(X.columns)
self.X = X.values
except AttributeError:
self.X = X
self.var_names = [str(ix) for ix in range(X.shape[1])]
self.rule_base = problem._construct_ruleBase(
best_individual, self.fuzzy_type)
self.lvs = self.rule_base.rule_bases[0].antecedents if self.lvs is None else self.lvs
self.eval_performance = evr.evalRuleBase(
self.rule_base, np.array(X), y)
self.eval_performance.add_full_evaluation()
self.rule_base.purge_rules(self.tolerance)
self.eval_performance.add_full_evaluation() # After purging the bad rules we update the metrics.
if p_value_compute:
self.p_value_validation(bootstrap_size)
self.rule_base.rename_cons(self.classes_names)
if self.lvs is None:
self.rename_fuzzy_variables()
for ix, lv in enumerate(self.rule_base.rule_bases[0].antecedents):
lv.name = lvs_names[ix]
[docs]
def print_rule_bootstrap_results(self) -> None:
'''
Prints the bootstrap results for each rule.
'''
self.rule_base.print_rule_bootstrap_results()
[docs]
def p_value_validation(self, bootstrap_size:int=100):
'''
Computes the permutation and bootstrapping p-values for the classifier and its rules.
:param bootstrap_size: integer. Number of bootstraps samples to use.
'''
self.p_value_class_structure, self.p_value_feature_coalitions = self.eval_performance.p_permutation_classifier_validation()
self.eval_performance.p_bootstrapping_rules_validation(bootstrap_size)
[docs]
def load_master_rule_base(self, rule_base: rules.MasterRuleBase) -> None:
'''
Loads a master rule base to be used in the prediction process.
:param rule_base: ruleBase object.
:return: None
'''
self.rule_base = rule_base
self.nRules = len(rule_base.get_rules())
self.nAnts = len(rule_base.get_rules()[0].antecedents)
self.nclasses_ = len(rule_base)
[docs]
def explainable_predict(self, X: np.array, out_class_names=False) -> np.array:
'''
Returns the predicted class for each sample.
'''
return self.rule_base.explainable_predict(X, out_class_names=out_class_names)
[docs]
def forward(self, X: np.array, out_class_names=False) -> np.array:
'''
Returns the predicted class for each sample.
:param X: np array samples x features.
:param out_class_names: if True, the output will be the class names instead of the class index.
:return: np array samples (x 1) with the predicted class.
'''
try:
X = X.values # If X was a pandas dataframe
except AttributeError:
pass
return self.rule_base.winning_rule_predict(X, out_class_names=out_class_names)
[docs]
def predict(self, X: np.array, out_class_names=False) -> np.array:
'''
Returns the predicted class for each sample.
:param X: np array samples x features.
:param out_class_names: if True, the output will be the class names instead of the class index.
:return: np array samples (x 1) with the predicted class.
'''
return self.forward(X, out_class_names=out_class_names)
[docs]
def predict_proba_rules(self, X: np.array, truth_degrees:bool=True) -> np.array:
'''
Returns the predicted class probabilities for each sample.
:param X: np array samples x features.
:param truth_degrees: if True, the output will be the truth degrees of the rules. If false, will return the association degrees i.e. the truth degree multiplied by the weights/dominance of the rules. (depending on the inference mode chosen)
:return: np array samples x classes with the predicted class probabilities.
'''
try:
X = X.values # If X was a pandas dataframe
except AttributeError:
pass
if truth_degrees:
return self.rule_base.compute_firing_strenghts(X)
else:
return self.rule_base.compute_association_degrees(X)
[docs]
def predict_membership_class(self, X: np.array) -> np.array:
'''
Returns the predicted class memberships for each sample.
:param X: np array samples x features.
:return: np array samples x classes with the predicted class probabilities.
'''
try:
X = X.values # If X was a pandas dataframe
except AttributeError:
pass
rule_predict_proba = self.rule_base.compute_association_degrees(X)
rule_consequents = self.rule_base.get_consequents()
res = np.zeros((X.shape[0], self.nclasses_))
for jx in range(rule_predict_proba.shape[1]):
consequent = rule_consequents[jx]
res[:, consequent] = np.maximum(res[:, consequent], rule_predict_proba[:, jx])
return res
[docs]
def predict_proba(self, X:np.array) -> np.array:
'''
Returns the predicted class probabilities for each sample.
:param X: np array samples x features.
:return: np array samples x classes with the predicted class probabilities.
'''
beliefs = self.predict_membership_class(X)
beliefs = beliefs / np.sum(beliefs, axis=1, keepdims=True) # Normalize the beliefs to sum to 1
return beliefs
[docs]
def print_rules(self, return_rules:bool=False, bootstrap_results:bool=False) -> None:
'''
Print the rules contained in the fitted rulebase.
'''
return self.rule_base.print_rules(return_rules, bootstrap_results)
[docs]
def plot_fuzzy_variables(self) -> None:
'''
Plot the fuzzy partitions in each fuzzy variable.
'''
fuzzy_variables = self.rule_base.rule_bases[0].antecedents
for ix, fv in enumerate(fuzzy_variables):
vis_rules.plot_fuzzy_variable(fv)
[docs]
def rename_fuzzy_variables(self) -> None:
'''
Renames the linguist labels so that high, low and so on are consistent. It does so usually after an optimization process.
:return: None. Names are sorted accorded to the central point of the fuzzy memberships.
'''
for ix in range(len(self.rule_base)):
fuzzy_variables = self.rule_base.rule_bases[ix].antecedents
for jx, fv in enumerate(fuzzy_variables):
if fv[0].shape() != 'categorical':
new_order_values = []
possible_names = FitRuleBase.vl_names[self.n_linguist_variables[jx]]
for zx, fuzzy_set in enumerate(fv.linguistic_variables):
studied_fz = fuzzy_set.type()
if studied_fz == fs.FUZZY_SETS.temporal:
studied_fz = fuzzy_set.inside_type()
if studied_fz == fs.FUZZY_SETS.t1:
f1 = np.mean(
fuzzy_set.membership_parameters[0] + fuzzy_set.membership_parameters[1])
elif (studied_fz == fs.FUZZY_SETS.t2):
f1 = np.mean(
fuzzy_set.secondMF_upper[0] + fuzzy_set.secondMF_upper[1])
elif studied_fz == fs.FUZZY_SETS.gt2:
sec_memberships = fuzzy_set.secondary_memberships.values()
f1 = float(list(fuzzy_set.secondary_memberships.keys())[np.argmax(
[fzm.membership_parameters[2] for ix, fzm in enumerate(sec_memberships)])])
new_order_values.append(f1)
new_order = np.argsort(np.array(new_order_values))
fuzzy_sets_vl = fv.linguistic_variables
for jx, x in enumerate(new_order):
fuzzy_sets_vl[x].name = possible_names[jx]
[docs]
def get_rulebase(self) -> list[np.array]:
'''
Get the rulebase obtained after fitting the classifier to the data.
:return: a matrix format for the rulebase.
'''
return self.rule_base.get_rulebase_matrix()
[docs]
def reparametrice_loss(self, alpha:float, beta:float) -> None:
'''
Changes the parameters in the loss function.
:note: Does not check for convexity preservation. The user can play with these parameters as it wills.
:param alpha: controls the MCC term.
:param beta: controls the average rule size loss.
'''
self.alpha_ = alpha
self.beta_ = beta
[docs]
def __call__(self, X:np.array) -> np.array:
'''
Returns the predicted class for each sample.
:param X: np array samples x features.
:return: np array samples (x 1) with the predicted class.
'''
return self.predict(X)
[docs]
class FitRuleBase(Problem):
'''
Class to model as pymoo problem the fitting of a rulebase for a classification problem using Evolutionary strategies.
Supports type 1 and iv fs (iv-type 2)
'''
def _init_optimize_vl(self, fuzzy_type: fs.FUZZY_SETS, n_linguist_variables: int, domain: list[(float, float)] = None, categorical_variables: list[int] = None, X=None):
'''
Inits the corresponding fields if no linguistic partitions were given.
:param fuzzy type: FUZZY_SET enum type in fuzzy_sets module. The kind of fuzzy set used.
:param n_linguistic_variables: number of linguistic variables per antecedent.
:param domain: list of the limits for each variable. If None (default) the classifier will compute them empirically.
'''
try:
from . import utils
except ImportError:
import utils
self.lvs = None
self.vl_names = [FitRuleBase.vl_names[n_linguist_variables[nn]] if n_linguist_variables[nn] < 6 else list(map(str, np.arange(nn))) for nn in range(len(n_linguist_variables))]
self.fuzzy_type = fuzzy_type
self.domain = domain
self._precomputed_truth = None
self.categorical_mask = categorical_variables
self.categorical_boolean_mask = np.array(categorical_variables) > 0 if categorical_variables is not None else None
self.categorical_variables = {}
for ix, cat in enumerate(categorical_variables):
if cat > 0:
self.categorical_variables[ix] = utils.construct_crisp_categorical_partition(np.array(X)[:, ix], self.var_names[ix], fuzzy_type)
self.n_lv_possible = []
for ix in range(len(self.categorical_mask)):
if self.categorical_mask[ix] > 0:
self.n_lv_possible.append(len(self.categorical_variables[ix]))
else:
self.n_lv_possible.append(n_linguist_variables[ix])
def _init_precomputed_vl(self, linguist_variables: list[fs.fuzzyVariable], X: np.array):
'''
Inits the corresponding fields if linguistic partitions for each variable are given.
:param linguistic_variables: list of fuzzyVariables type.
:param X: np array samples x features.
'''
self.lvs = linguist_variables
self.vl_names = [lv.linguistic_variable_names() for lv in self.lvs]
self.n_lv_possible = [len(lv.linguistic_variable_names()) for lv in self.lvs]
self.fuzzy_type = self.lvs[0].fs_type
self.domain = None
self._precomputed_truth = rules.compute_antecedents_memberships(linguist_variables, X)
vl_names = [ # Linguistic variable names prenamed for some specific cases.
[],
[],
['Low', 'High'],
['Low', 'Medium', 'High'],
['Low', 'Medium', 'High', 'Very High'],
['Very Low', 'Low', 'Medium', 'High', 'Very High']
]
[docs]
def __init__(self, X: np.array, y: np.array, nRules: int, nAnts: int, n_classes: int, thread_runner: StarmapParallelization=None,
linguistic_variables:list[fs.fuzzyVariable]=None, n_linguistic_variables:int=3, fuzzy_type=fs.FUZZY_SETS.t1, domain:list=None, categorical_mask: np.array=None,
tolerance:float=0.01, alpha:float=0.0, beta:float=0.0, ds_mode: int =0, allow_unknown:bool=False, backend_name:str='pymoo') -> None:
'''
Cosntructor method. Initializes the classifier with the number of antecedents, linguist variables and the kind of fuzzy set desired.
:param X: np array or pandas dataframe samples x features.
:param y: np vector containing the target classes. vector sample
:param nRules: number of rules to optimize.
:param nAnts: max number of antecedents to use.
:param n_class: number of classes in the problem. If None (as default) it will be computed from the data.
:param linguistic_variables: list of linguistic variables precomputed. If given, the rest of conflicting arguments are ignored.
:param n_linguistic_variables: number of linguistic variables per antecedent.
:param fuzzy_type: Define the fuzzy set or fuzzy set extension used as linguistic variable.
:param domain: list with the upper and lower domains of each input variable. If None (as default) it will stablish the empirical min/max as the limits.
:param tolerance: float. Tolerance for the size evaluation.
:param alpha: float. Weight for the rulebase size term in the fitness function. (Penalizes number of rules)
:param beta: float. Weight for the average rule size term in the fitness function.
:param ds_mode: int. Mode for the dominance score. 0: normal dominance score, 1: rules without weights, 2: weights optimized for each rule based on the data.
:param allow_unknown: if True, the classifier will allow the unknown class in the classification process. (Which would be a -1 value)
'''
try:
self.var_names = list(X.columns)
self.X = X.values
except AttributeError:
self.X = X
self.var_names = [str(ix) for ix in range(X.shape[1])]
try:
self.tolerance = tolerance
except KeyError:
self.tolerance = 0.001
self.y = y
self.classes_names = np.unique(y)
self.nRules = nRules
self.nAnts = nAnts
self.nCons = 1 # This is fixed to MISO rules.
self.ds_mode = ds_mode
self.allow_unknown = allow_unknown
if n_classes is not None:
self.n_classes = n_classes
else:
self.n_classes = len(np.unique(y))
if categorical_mask is None:
self.categorical_mask = np.zeros(X.shape[1])
categorical_mask = self.categorical_mask
if linguistic_variables is not None:
self._init_precomputed_vl(linguistic_variables, X)
else:
if isinstance(n_linguistic_variables, int):
n_linguistic_variables = [n_linguistic_variables] * self.X.shape[1]
self._init_optimize_vl(
fuzzy_type=fuzzy_type, n_linguist_variables=n_linguistic_variables, categorical_variables=categorical_mask, domain=domain, X=X)
if self.domain is None:
# If all the variables are numerical, then we can compute the min/max of the domain.
if np.all([np.issubdtype(self.X[:, ix].dtype, np.number) for ix in range(self.X.shape[1])]):
self.min_bounds = np.min(self.X, axis=0)
self.max_bounds = np.max(self.X, axis=0)
else:
self.min_bounds = np.zeros(self.X.shape[1])
self.max_bounds = np.zeros(self.X.shape[1])
for ix in range(self.X.shape[1]):
if np.issubdtype(self.X[:, ix].dtype, np.number):
self.min_bounds[ix] = np.min(self.X[:, ix])
self.max_bounds[ix] = np.max(self.X[:, ix])
else:
self.min_bounds[ix] = 0
self.max_bounds[ix] = len(np.unique(self.X[:, ix][~pd.isna(self.X[:, ix])]))
else:
self.min_bounds, self.max_bounds = self.domain
self.antecedents_referencial = [np.linspace(
self.min_bounds[ix], self.max_bounds[ix], 100) for ix in range(self.X.shape[1])]
possible_antecedent_bounds = np.array(
[[0, self.X.shape[1] - 1]] * self.nAnts * self.nRules)
vl_antecedent_bounds = np.array(
[[-1, self.n_lv_possible[ax] - 1] for ax in range(self.nAnts)] * self.nRules) # -1 means not caring
antecedent_bounds = np.concatenate(
(possible_antecedent_bounds, vl_antecedent_bounds))
vars_antecedent = {ix: Integer(
bounds=antecedent_bounds[ix]) for ix in range(len(antecedent_bounds))}
aux_counter = len(vars_antecedent)
if self.lvs is None:
self.feature_domain_bounds = np.array(
[[0, 99] for ix in range(self.X.shape[1])])
if self.fuzzy_type == fs.FUZZY_SETS.t1:
correct_size = [(self.n_lv_possible[ixx]-1) * 4 + 3 for ixx in range(len(self.n_lv_possible))]
elif self.fuzzy_type == fs.FUZZY_SETS.t2:
correct_size = [(self.n_lv_possible[ixx]-1) * 6 + 2 for ixx in range(len(self.n_lv_possible))]
membership_bounds = np.concatenate(
[[self.feature_domain_bounds[ixx]] * correct_size[ixx] for ixx in range(len(self.n_lv_possible))])
vars_memberships = {
aux_counter + ix: Integer(bounds=membership_bounds[ix]) for ix in range(len(membership_bounds))}
aux_counter += len(vars_memberships)
final_consequent_bounds = np.array(
[[-1, self.n_classes - 1]] * self.nRules)
vars_consequent = {aux_counter + ix: Integer(
bounds=final_consequent_bounds[ix]) for ix in range(len(final_consequent_bounds))}
if self.lvs is None:
vars = {key: val for d in [
vars_antecedent, vars_memberships, vars_consequent] for key, val in d.items()}
varbound = np.concatenate(
(antecedent_bounds, membership_bounds, final_consequent_bounds), axis=0)
else:
vars = {key: val for d in [vars_antecedent,
vars_consequent] for key, val in d.items()}
varbound = np.concatenate(
(antecedent_bounds, final_consequent_bounds), axis=0)
if self.ds_mode == 2:
weights_bounds = np.array([[0, 99] for ix in range(self.nRules)])
vars_weights = {max(vars.keys()) + 1 + ix: Integer(
bounds=weights_bounds[ix]) for ix in range(len(weights_bounds))}
vars = {key: val for d in [vars, vars_weights] for key, val in d.items()}
varbound = np.concatenate((varbound, weights_bounds), axis=0)
nVar = len(varbound)
self.single_gen_size = nVar
self.alpha_ = alpha
self.beta_ = beta
self.backend_name = backend_name
if thread_runner is not None:
super().__init__(
vars=vars,
n_var=nVar,
n_obj=1,
elementwise=True,
vtype=int,
xl=varbound[:, 0],
xu=varbound[:, 1],
elementwise_runner=thread_runner)
else:
super().__init__(
vars=vars,
n_var=nVar,
n_obj=1,
elementwise=True,
vtype=int,
xl=varbound[:, 0],
xu=varbound[:, 1])
def _decode_membership_functions(self, x: np.array, fuzzy_type: fs.FUZZY_SETS) -> list[fs.fuzzyVariable]:
"""
Decode membership function parameters from gene encoding.
:param x: gene array containing encoded membership function parameters
:param fuzzy_type: type of fuzzy set (t1 or t2)
:return: list of fuzzyVariable objects with decoded membership functions
"""
third_pointer = 2 * self.nAnts * self.nRules
aux_pointer = 0
antecedents = []
for fuzzy_variable in range(self.X.shape[1]):
linguistic_variables = []
lv_FS = []
for lx in range(self.n_lv_possible[fuzzy_variable]):
parameter_pointer = third_pointer + aux_pointer
if fuzzy_type == fs.FUZZY_SETS.t1:
if lx == 0:
fz_parameters_idx0 = x[parameter_pointer]
fz_parameters_idx1 = x[parameter_pointer + 1]
fz_parameters_idx2 = x[parameter_pointer + 2]
fz_parameters_idx3 = x[parameter_pointer + 3]
fz0 = fz_parameters_idx0
fz1 = fz_parameters_idx0
fz2 = fz1 + fz_parameters_idx1
next_fz0 = fz2 + fz_parameters_idx2
fz3 = next_fz0 + fz_parameters_idx3
fz_parameters = np.array([fz0, fz1, fz2, fz3])
aux_pointer += 4
elif lx == self.n_lv_possible[fuzzy_variable] - 1:
fz_parameters_idx1 = x[parameter_pointer]
fz_parameters_idx2 = x[parameter_pointer + 1]
fz0 = next_fz0
fz1 = fz3 + fz_parameters_idx1
fz2 = fz1 + fz_parameters_idx2
fz3 = fz2
fz_parameters = np.array([fz0, fz1, fz2, fz3])
aux_pointer += 3
else:
fz_parameters_idx1 = x[parameter_pointer]
fz_parameters_idx2 = x[parameter_pointer + 1]
fz_parameters_idx3 = x[parameter_pointer + 2]
fz_parameters_idx4 = x[parameter_pointer + 3]
fz0 = next_fz0
fz1 = fz3 + fz_parameters_idx1
fz2 = fz1 + fz_parameters_idx2
next_fz0 = fz2 + fz_parameters_idx3
fz3 = next_fz0 + fz_parameters_idx4
aux_pointer += 4
fz_parameters = np.array([fz0, fz1, fz2, fz3])
lv_FS.append(fz_parameters)
elif fuzzy_type == fs.FUZZY_SETS.t2:
if lx == 0:
fz_parameters_idx0 = x[parameter_pointer]
fz_parameters_idx1 = x[parameter_pointer + 1]
fz_parameters_idx2 = x[parameter_pointer + 2]
fz_parameters_idx3 = x[parameter_pointer + 3]
fz_parameters_idx4 = x[parameter_pointer + 4]
fz_parameters_idx5 = x[parameter_pointer + 5]
l_fz0 = fz_parameters_idx0
l_fz1 = l_fz0
l_fz2 = l_fz1 + fz_parameters_idx1
next_ufz0 = l_fz2 + fz_parameters_idx2
next_lfz0 = next_ufz0 + fz_parameters_idx3
l_fz3 = next_lfz0 + fz_parameters_idx4
u_fz0 = l_fz0
u_fz1 = u_fz0
u_fz2 = l_fz2
u_fz3 = l_fz3 + fz_parameters_idx5
l_fz_parameters = np.array([l_fz0, l_fz1, l_fz2, l_fz3])
u_fz_parameters = np.array([u_fz0, u_fz1, u_fz2, u_fz3])
next_init = l_fz2 + fz_parameters_idx4
aux_pointer += 6
elif lx == self.n_lv_possible[fuzzy_variable] - 1:
fz_parameters_idx0 = x[parameter_pointer]
fz_parameters_idx1 = x[parameter_pointer + 1]
u_fz0 = next_ufz0
l_fz0 = next_lfz0
u_fz1 = u_fz3 + fz_parameters_idx0
l_fz1 = u_fz1
u_fz2 = l_fz1 + fz_parameters_idx1
l_fz2 = u_fz2
l_fz3 = l_fz2
u_fz3 = l_fz3
l_fz_parameters = np.array([l_fz0, l_fz1, l_fz2, l_fz3])
u_fz_parameters = np.array([u_fz0, u_fz1, u_fz2, u_fz3])
aux_pointer += 2
else:
fz_parameters_idx0 = x[parameter_pointer]
fz_parameters_idx1 = x[parameter_pointer + 1]
fz_parameters_idx2 = x[parameter_pointer + 2]
fz_parameters_idx3 = x[parameter_pointer + 3]
fz_parameters_idx4 = x[parameter_pointer + 4]
fz_parameters_idx5 = x[parameter_pointer + 5]
u_fz0 = next_ufz0
l_fz0 = next_lfz0
l_fz1 = u_fz3 + fz_parameters_idx0
u_fz1 = l_fz1
l_fz2 = l_fz1 + fz_parameters_idx1
u_fz2 = l_fz2
next_ufz0 = l_fz2 + fz_parameters_idx2
next_lfz0 = next_ufz0 + fz_parameters_idx3
l_fz3 = next_lfz0 + fz_parameters_idx4
u_fz3 = l_fz3 + fz_parameters_idx5
l_fz_parameters = np.array([l_fz0, l_fz1, l_fz2, l_fz3])
u_fz_parameters = np.array([u_fz0, u_fz1, u_fz2, u_fz3])
aux_pointer += 6
lv_FS.append((l_fz_parameters, u_fz_parameters))
# Build fuzzy variable from the decoded parameters
if self.categorical_boolean_mask is not None and self.categorical_boolean_mask[fuzzy_variable]:
linguistic_variable = self.categorical_variables[fuzzy_variable]
else:
for lx, relevant_lv in enumerate(lv_FS):
if fuzzy_type == fs.FUZZY_SETS.t1:
proper_FS = fs.FS(self.vl_names[fuzzy_variable][lx], relevant_lv, None)
elif fuzzy_type == fs.FUZZY_SETS.t2:
proper_FS = fs.IVFS(self.vl_names[fuzzy_variable][lx], relevant_lv[0], relevant_lv[1], None)
linguistic_variables.append(proper_FS)
linguistic_variable = fs.fuzzyVariable(self.var_names[fuzzy_variable], linguistic_variables)
antecedents.append(linguistic_variable)
return antecedents
[docs]
def encode_rulebase(self, rule_base: rules.MasterRuleBase, optimize_lv: bool) -> np.array:
'''
Given a rule base, constructs the corresponding gene associated with that rule base.
GENE STRUCTURE
First: antecedents chosen by each rule. Size: nAnts * nRules (index of the antecedent)
Second: Variable linguistics used. Size: nAnts * nRules
Third: Parameters for the fuzzy partitions of the chosen variables. Size: nAnts * self.n_linguistic_variables * 8|4 (2 trapezoidal memberships if t2)
Four: Consequent classes. Size: nRules
:param rule_base: rule base object.
:param optimize_lv: if True, the gene is prepared to optimize the membership functions.
:return: np array of size self.single_gen_size.
'''
gene = np.zeros((self.single_gen_size,))
n_lv_possible = len(rule_base.rule_bases[0].antecedents[0].linguistic_variables)
fuzzy_type = rule_base.fuzzy_type()
rule_consequents = rule_base.get_consequents()
nreal_rules = len(rule_consequents)
mf_size = 4 if fuzzy_type == fs.FUZZY_SETS.t1 else 8
# Pointer to the fourth section of the gene: consequents
if optimize_lv:
# If lv memberships are optimized.
fourth_pointer = 2 * self.nAnts * self.nRules + \
len(self.n_lv_possible) * 3 + len(self.n_lv_possible) * 2 + sum(np.array(self.n_lv_possible)-2) * mf_size
else:
# If no memberships are optimized.
fourth_pointer = 2 * self.nAnts * self.nRules
# Pointer to the fifth section of the gene: weights (if they exist)
fifth_pointer = fourth_pointer + self.nRules
if rule_base.ds_mode == 2:
for ix, rule in enumerate(rule_base.get_rules()):
gene[fifth_pointer + ix] = rule.weight
# First and second sections of the gene: antecedents and linguistic variables
for i0, rule in enumerate(rule_base.get_rules()): # Reconstruct the rules
first_pointer = i0 * self.nAnts
second_pointer = (self.nRules * self.nAnts) + i0 * self.nAnts
for ax, linguistic_variable in enumerate(rule.antecedents):
gene[first_pointer + ax] = ax
gene[second_pointer + ax] = linguistic_variable
# Update the fourth section of the gene: consequents using the fourth pointer
gene[fourth_pointer + i0] = rule_consequents[i0]
# Fill the rest of the rules with don't care values
nvoid_rules = self.nRules - nreal_rules
for vx in range(nvoid_rules):
first_pointer = nreal_rules * self.nAnts + vx * self.nAnts
second_pointer = (self.nRules * self.nAnts) + nreal_rules * self.nAnts + vx * self.nAnts
for ax, linguistic_variable in enumerate(rule.antecedents):
gene[first_pointer + ax] = ax
gene[second_pointer + ax] = -1
# Update the fourth section of the gene: consequents using the fourth pointer
gene[fourth_pointer + nreal_rules + vx] = -1
if optimize_lv:
# If lv memberships are optimized.
third_pointer = 2 * self.nAnts * self.nRules
aux_pointer = 0
for ix, fuzzy_variable in enumerate(rule_base.get_antecedents()):
for linguistic_variable in range(n_lv_possible):
fz_parameters = fuzzy_variable[linguistic_variable].membership_parameters
for jx, fz_parameter in enumerate(fz_parameters):
closest_idx = (np.abs(np.asarray(self.antecedents_referencial[ix]) - fz_parameter)).argmin()
gene[third_pointer + aux_pointer] = closest_idx
aux_pointer += 1
return np.array(list(map(int, gene)))
def _construct_ruleBase(self, x: np.array, fuzzy_type: fs.FUZZY_SETS, **kwargs) -> rules.MasterRuleBase:
'''
Given a subject, it creates a rulebase according to its specification.
:param x: gen of a rulebase. type: dict.
:param fuzzy_type: a enum type. Check fuzzy_sets for complete specification (two fields, t1 and t2, to mark which fs you want to use)
:param kwargs: additional parameters to pass to the rule
:return: a rulebase object.
kwargs:
- time_moment: if temporal fuzzy sets are used with different partitions for each time interval,
then this parameter is used to specify which time moment is being used.
'''
rule_list = [[] for _ in range(self.n_classes)]
mf_size = 4 if fuzzy_type == fs.FUZZY_SETS.t1 else 6
'''
GEN STRUCTURE
First: features chosen by each rule. Size: nAnts * nRules
Second: linguistic labels used. Size: nAnts * nRules
Third: Parameters for the fuzzy partitions of the chosen variables. Size: X.shape[1] * ((self.n_linguistic_variables-1) * mf_size + 2)
Four: Consequent classes. Size: nRules
Five: Weights for each rule. Size: nRules (only if ds_mode == 2)
Sixth: Modifiers for the membership functions. Size: len(self.lvs) * nAnts * nRules
'''
if self.lvs is None:
# If memberships are optimized.
if fuzzy_type == fs.FUZZY_SETS.t1:
fourth_pointer = 2 * self.nAnts * self.nRules + \
len(self.n_lv_possible) * 3 + sum(np.array(self.n_lv_possible)-1) * 4 # 4 is the size of the membership function, 3 is the size of the first (and last) membership function
elif fuzzy_type == fs.FUZZY_SETS.t2:
fourth_pointer = 2 * self.nAnts * self.nRules + \
len(self.n_lv_possible) * 2 + sum(np.array(self.n_lv_possible)-1) * mf_size
else:
# If no memberships are optimized.
fourth_pointer = 2 * self.nAnts * self.nRules
if self.ds_mode == 2:
fifth_pointer = fourth_pointer + self.nRules
else:
fifth_pointer = fourth_pointer
if self.ds_mode == 2:
sixth_pointer = fifth_pointer + self.nRules
else:
sixth_pointer = fifth_pointer
aux_pointer = 0
min_domain = np.zeros(self.X.shape[1])
max_domain = np.zeros(self.X.shape[1])
# Handle mixed data types (numerical and string columns)
for ix in range(self.X.shape[1]):
if np.issubdtype(self.X[:, ix].dtype, np.number):
# For numerical columns, use nanmin/nanmax
min_domain[ix] = np.nanmin(self.X[:, ix])
max_domain[ix] = np.nanmax(self.X[:, ix])
else:
# For string/categorical columns, use 0 and number of unique values
min_domain[ix] = 0
max_domain[ix] = len(np.unique(self.X[:, ix][~pd.isna(self.X[:, ix])]))
range_domain = np.zeros((self.X.shape[1],))
for ix in range(self.X.shape[1]):
try:
range_domain[ix] = max_domain[ix] - min_domain[ix]
except TypeError:
pass
# Integer sampling doesnt work fine in pymoo, so we do this (which is btw what pymoo is really doing if you just set integer optimization)
try:
# subject might come as a dict.
x = np.array(list(x.values())).astype(int)
except AttributeError:
x = x.astype(int)
for i0 in range(self.nRules): # Reconstruct the rules
first_pointer = i0 * self.nAnts
chosen_ants = x[first_pointer:first_pointer + self.nAnts]
second_pointer = (i0 * self.nAnts) + (self.nAnts * self.nRules)
# Shape: self.nAnts + self.n_lv_possible + 1
antecedent_parameters = x[second_pointer:second_pointer+self.nAnts]
init_rule_antecedents = np.zeros(
(self.X.shape[1],)) - 1 # -1 is dont care
for jx, ant in enumerate(chosen_ants):
if self.lvs is not None:
antecedent_parameters[jx] = min(antecedent_parameters[jx], len(self.lvs[ant]) - 1)
else:
antecedent_parameters[jx] = min(antecedent_parameters[jx], self.n_lv_possible[ant] - 1)
init_rule_antecedents[ant] = antecedent_parameters[jx]
consequent_idx = x[fourth_pointer + aux_pointer]
assert consequent_idx < self.n_classes, "Consequent class is not valid. Something in the gene is wrong."
aux_pointer += 1
if self.ds_mode == 2:
rule_weight = x[fifth_pointer + i0] / 100
else:
rule_weight = 1.0
if consequent_idx != -1 and np.any(init_rule_antecedents != -1):
rs_instance = rules.RuleSimple(init_rule_antecedents, 0, None)
if self.ds_mode == 1 or self.ds_mode == 2:
rs_instance.weight = rule_weight
rule_list[consequent_idx].append(
rs_instance)
# If we optimize the membership functions - decode and normalize them
if self.lvs is None:
antecedents_raw = self._decode_membership_functions(x, fuzzy_type)
# Normalize the membership functions to the data domain
antecedents = []
for fuzzy_variable, fv_raw in enumerate(antecedents_raw):
if self.categorical_boolean_mask is not None and self.categorical_boolean_mask[fuzzy_variable]:
antecedents.append(fv_raw)
else:
# Extract raw parameters and normalize
lv_FS = [lv.membership_parameters for lv in fv_raw.linguistic_variables]
min_lv = np.min(np.array(lv_FS))
max_lv = np.max(np.array(lv_FS))
linguistic_variables = []
for lx, relevant_lv in enumerate(lv_FS):
relevant_lv = (relevant_lv - min_lv) / (max_lv - min_lv) * range_domain[fuzzy_variable] + min_domain[fuzzy_variable]
if fuzzy_type == fs.FUZZY_SETS.t1:
proper_FS = fs.FS(self.vl_names[fuzzy_variable][lx], relevant_lv, (min_domain[fuzzy_variable], max_domain[fuzzy_variable]))
elif fuzzy_type == fs.FUZZY_SETS.t2:
proper_FS = fs.IVFS(self.vl_names[fuzzy_variable][lx], relevant_lv[0], relevant_lv[1], (min_domain[fuzzy_variable], max_domain[fuzzy_variable]))
linguistic_variables.append(proper_FS)
linguistic_variable = fs.fuzzyVariable(self.var_names[fuzzy_variable], linguistic_variables)
antecedents.append(linguistic_variable)
else:
try:
antecedents = self.lvs[kwargs['time_moment']]
except:
antecedents = self.lvs
for i in range(self.n_classes):
if fuzzy_type == fs.FUZZY_SETS.temporal:
fuzzy_type = self.lvs[0][0].inside_type()
if fuzzy_type == fs.FUZZY_SETS.t1:
rule_base = rules.RuleBaseT1(antecedents, rule_list[i])
elif fuzzy_type == fs.FUZZY_SETS.t2:
rule_base = rules.RuleBaseT2(antecedents, rule_list[i])
elif fuzzy_type == fs.FUZZY_SETS.gt2:
rule_base = rules.RuleBaseGT2(antecedents, rule_list[i])
if i == 0:
res = rules.MasterRuleBase([rule_base], self.classes_names, ds_mode=self.ds_mode, allow_unknown=self.allow_unknown)
else:
res.add_rule_base(rule_base)
res.rename_cons(self.classes_names)
return res
def _evaluate_numpy_fast(self, x: np.array, y: np.array, fuzzy_type: fs.FUZZY_SETS, **kwargs) -> rules.MasterRuleBase:
'''
Memory-efficient vectorized numpy evaluation with automatic sample batching.
Automatically batches samples when dataset is too large to fit in memory.
Uses indicator matrices for efficient computation within each batch.
:param x: gen of a rulebase. type: dict.
:param fuzzy_type: a enum type. Check fuzzy_sets for complete specification (two fields, t1 and t2, to mark which fs you want to use)
:param kwargs: additional parameters to pass to the rule
:return: the MCC of the hypothetical rulebase.
kwargs:
- time_moment: if temporal fuzzy sets are used with different partitions for each time interval,
then this parameter is used to specify which time moment is being used.
'''
mf_size = 4 if fuzzy_type == fs.FUZZY_SETS.t1 else 6
n_samples = self.X.shape[0]
# Estimate memory usage and determine if we need sample batching
max_lvars = max(self.n_lv_possible)
n_features = self.X.shape[1]
# Estimate bytes for key arrays
membership_array_bytes = n_samples * n_features * max_lvars * 8 # float64
indicators_bytes = n_features * max_lvars * self.nRules * self.nAnts * 8
einsum_result_bytes = n_samples * self.nRules * self.nAnts * 8
total_estimated_bytes = membership_array_bytes + indicators_bytes + einsum_result_bytes
# Get available memory
try:
import psutil
available_memory = psutil.virtual_memory().available
# Use at most 30% of available memory for safety
memory_budget = available_memory * 0.3
except ImportError:
# Conservative default: 2GB
memory_budget = 2 * 1024**3
# If estimated usage exceeds budget, use sample batching
if total_estimated_bytes > memory_budget:
return self._evaluate_numpy_fast_batched(x, y, fuzzy_type, memory_budget, **kwargs)
else:
return self._evaluate_numpy_fast_full(x, y, fuzzy_type, **kwargs)
def _evaluate_numpy_fast_full(self, x: np.array, y: np.array, fuzzy_type: fs.FUZZY_SETS, **kwargs) -> float:
'''
Full vectorized evaluation - processes all samples at once.
Only called when dataset fits comfortably in memory.
:param x: gene array
:param y: labels
:param fuzzy_type: fuzzy set type
:return: MCC score
'''
mf_size = 4 if fuzzy_type == fs.FUZZY_SETS.t1 else 6
# Calculate pointers
if self.lvs is None:
if fuzzy_type == fs.FUZZY_SETS.t1:
fourth_pointer = 2 * self.nAnts * self.nRules + \
len(self.n_lv_possible) * 3 + sum(np.array(self.n_lv_possible)-1) * 4
elif fuzzy_type == fs.FUZZY_SETS.t2:
fourth_pointer = 2 * self.nAnts * self.nRules + \
len(self.n_lv_possible) * 2 + sum(np.array(self.n_lv_possible)-1) * mf_size
else:
fourth_pointer = 2 * self.nAnts * self.nRules
if self.ds_mode == 2:
fifth_pointer = fourth_pointer + self.nRules
else:
fifth_pointer = fourth_pointer
# Get precomputed memberships
if self.lvs is None:
antecedents = self._decode_membership_functions(x, fuzzy_type)
precomputed_antecedent_memberships = rules.compute_antecedents_memberships(antecedents, self.X)
else:
precomputed_antecedent_memberships = self._precomputed_truth
# Convert x to int array
try:
x = np.array(list(x.values())).astype(int)
except AttributeError:
x = x.astype(int)
# VECTORIZED APPROACH - extract all gene segments at once
n_samples = self.X.shape[0]
n_features = self.X.shape[1]
# Extract features and parameters for all rules: (nRules, nAnts)
chosen_ants = x[:self.nAnts * self.nRules].reshape(self.nRules, self.nAnts)
ant_params = x[self.nAnts * self.nRules:2 * self.nAnts * self.nRules].reshape(self.nRules, self.nAnts)
# Clamp parameters to valid ranges (vectorized)
for feat_idx in range(n_features):
mask = chosen_ants == feat_idx
if self.lvs is not None:
max_param = len(self.lvs[feat_idx]) - 1
else:
max_param = self.n_lv_possible[feat_idx] - 1
ant_params = np.where(mask, np.minimum(ant_params, max_param), ant_params)
# Build unified membership tensor: (n_samples, n_features, max_lvars)
if not hasattr(self, '_precomputed_membership_array'):
max_lvars = max(self.n_lv_possible)
membership_array = np.zeros((n_samples, n_features, max_lvars))
for feat_idx in range(n_features):
feat_memberships = precomputed_antecedent_memberships[feat_idx] # (n_lvars, n_samples)
n_lvars = feat_memberships.shape[0]
membership_array[:, feat_idx, :n_lvars] = feat_memberships.T
self._precomputed_membership_array = membership_array
self._max_lvars_numpy = max_lvars
membership_array = self._precomputed_membership_array
# Create indicator matrix: (n_features, max_lvars, nRules, nAnts)
indicators = np.zeros((n_features, self._max_lvars_numpy, self.nRules, self.nAnts))
# Build indicators using advanced indexing
rule_indices = np.arange(self.nRules)[:, None] # (nRules, 1)
ant_indices = np.arange(self.nAnts)[None, :] # (1, nAnts)
indicators[chosen_ants, ant_params, rule_indices, ant_indices] = 1.0
# Compute memberships for all antecedents: (n_samples, nRules, nAnts)
# membership_array: (n_samples, n_features, max_lvars)
# indicators: (n_features, max_lvars, nRules, nAnts)
ant_memberships = np.einsum('sfl,flra->sra', membership_array, indicators)
# Product over antecedents to get rule memberships: (n_samples, nRules)
rule_memberships = np.prod(ant_memberships, axis=2)
# Get consequents
rule_class_consequents = x[fourth_pointer:fourth_pointer + self.nRules].astype(int)
# Apply weights if needed (vectorized)
if self.ds_mode == 2:
rule_weights = x[fifth_pointer:fifth_pointer + self.nRules] / 100.0
rule_memberships = rule_memberships * rule_weights[np.newaxis, :]
# One-hot encoding and matrix multiplication
rule_onehot = np.eye(self.n_classes)[rule_class_consequents] # (nRules, n_classes)
memberships_pred = rule_memberships @ rule_onehot # (n_samples, nRules) @ (nRules, n_classes)
predicted_classes = np.argmax(memberships_pred, axis=1)
return matthews_corrcoef(y, predicted_classes)
def _evaluate_numpy_fast_batched(self, x: np.array, y: np.array, fuzzy_type: fs.FUZZY_SETS, memory_budget: float, **kwargs) -> float:
'''
Memory-efficient evaluation that processes samples in batches.
:param x: gene array
:param y: labels
:param fuzzy_type: fuzzy set type
:param memory_budget: available memory in bytes
:return: MCC score
'''
mf_size = 4 if fuzzy_type == fs.FUZZY_SETS.t1 else 6
# Calculate pointers (same as full version)
if self.lvs is None:
if fuzzy_type == fs.FUZZY_SETS.t1:
fourth_pointer = 2 * self.nAnts * self.nRules + \
len(self.n_lv_possible) * 3 + sum(np.array(self.n_lv_possible)-1) * 4
elif fuzzy_type == fs.FUZZY_SETS.t2:
fourth_pointer = 2 * self.nAnts * self.nRules + \
len(self.n_lv_possible) * 2 + sum(np.array(self.n_lv_possible)-1) * mf_size
else:
fourth_pointer = 2 * self.nAnts * self.nRules
if self.ds_mode == 2:
fifth_pointer = fourth_pointer + self.nRules
else:
fifth_pointer = fourth_pointer
# Get precomputed memberships
if self.lvs is None:
antecedents = self._decode_membership_functions(x, fuzzy_type)
precomputed_antecedent_memberships = rules.compute_antecedents_memberships(antecedents, self.X)
else:
precomputed_antecedent_memberships = self._precomputed_truth
# Convert x to int array
try:
x = np.array(list(x.values())).astype(int)
except AttributeError:
x = x.astype(int)
n_samples = self.X.shape[0]
n_features = self.X.shape[1]
max_lvars = max(self.n_lv_possible)
# Calculate optimal batch size based on memory budget
indicators_bytes = n_features * max_lvars * self.nRules * self.nAnts * 8
per_sample_bytes = n_features * max_lvars * 8 + self.nRules * self.nAnts * 8
available_for_samples = memory_budget - indicators_bytes
sample_batch_size = max(100, int(available_for_samples / per_sample_bytes))
sample_batch_size = min(sample_batch_size, n_samples)
# Extract and clamp gene segments (same as full version)
chosen_ants = x[:self.nAnts * self.nRules].reshape(self.nRules, self.nAnts)
ant_params = x[self.nAnts * self.nRules:2 * self.nAnts * self.nRules].reshape(self.nRules, self.nAnts)
for feat_idx in range(n_features):
mask = chosen_ants == feat_idx
if self.lvs is not None:
max_param = len(self.lvs[feat_idx]) - 1
else:
max_param = self.n_lv_possible[feat_idx] - 1
ant_params = np.where(mask, np.minimum(ant_params, max_param), ant_params)
# Create indicator matrix once (shared across batches)
indicators = np.zeros((n_features, max_lvars, self.nRules, self.nAnts))
rule_indices = np.arange(self.nRules)[:, None]
ant_indices = np.arange(self.nAnts)[None, :]
indicators[chosen_ants, ant_params, rule_indices, ant_indices] = 1.0
# Get consequents and weights
rule_class_consequents = x[fourth_pointer:fourth_pointer + self.nRules].astype(int)
if self.ds_mode == 2:
rule_weights = x[fifth_pointer:fifth_pointer + self.nRules] / 100.0
# Process samples in batches
all_predictions = []
for batch_start in range(0, n_samples, sample_batch_size):
batch_end = min(batch_start + sample_batch_size, n_samples)
# Build membership array for this batch
batch_size = batch_end - batch_start
membership_array_batch = np.zeros((batch_size, n_features, max_lvars))
for feat_idx in range(n_features):
feat_memberships = precomputed_antecedent_memberships[feat_idx][:, batch_start:batch_end]
n_lvars = feat_memberships.shape[0]
membership_array_batch[:, feat_idx, :n_lvars] = feat_memberships.T
# Compute for this batch
ant_memberships = np.einsum('sfl,flra->sra', membership_array_batch, indicators)
rule_memberships_batch = np.prod(ant_memberships, axis=2)
# Apply weights if needed
if self.ds_mode == 2:
rule_memberships_batch = rule_memberships_batch * rule_weights[np.newaxis, :]
# Predict for this batch
rule_onehot = np.eye(self.n_classes)[rule_class_consequents]
memberships_pred = rule_memberships_batch @ rule_onehot
predicted_classes_batch = np.argmax(memberships_pred, axis=1)
all_predictions.append(predicted_classes_batch)
# Concatenate all predictions
predicted_classes = np.concatenate(all_predictions)
return matthews_corrcoef(y, predicted_classes)
def _evaluate_torch_fast(self, x, y, fuzzy_type: fs.FUZZY_SETS, device='cuda', return_tensor=False, **kwargs):
'''
PyTorch GPU-accelerated version of _evaluate_numpy_fast.
:param x: gene tensor (can be numpy array or torch tensor)
:param y: labels tensor (can be numpy array or torch tensor)
:param fuzzy_type: enum type for fuzzy set type
:param device: device to run computation on ('cuda' or 'cpu')
:param return_tensor: if True, return predictions as torch tensor; if False, return MCC as float
:param kwargs: additional parameters
:return: Matthews correlation coefficient (as Python float) or prediction tensor
'''
try:
import torch
except ImportError:
raise ImportError("PyTorch is required for _evaluate_torch_fast. Install with: pip install torch")
# Convert inputs to torch tensors if needed
if not isinstance(x, torch.Tensor):
x = torch.from_numpy(x).to(device)
else:
x = x.to(device)
if not isinstance(y, torch.Tensor):
y_torch = torch.from_numpy(y).long().to(device)
else:
y_torch = y.long().to(device)
# Convert training data to torch if not already done
if not hasattr(self, 'X_torch') or self.X_torch is None:
self.X_torch = torch.from_numpy(self.X).float().to(device)
mf_size = 4 if fuzzy_type == fs.FUZZY_SETS.t1 else 6
rule_memberships = torch.zeros((self.X.shape[0], self.nRules), device=device)
rule_class_consequents = torch.zeros((self.nRules,), dtype=torch.long, device=device)
# Calculate pointers (same logic as numpy version)
if self.lvs is None:
if fuzzy_type == fs.FUZZY_SETS.t1:
fourth_pointer = 2 * self.nAnts * self.nRules + \
len(self.n_lv_possible) * 3 + sum(np.array(self.n_lv_possible)-1) * 4
elif fuzzy_type == fs.FUZZY_SETS.t2:
fourth_pointer = 2 * self.nAnts * self.nRules + \
len(self.n_lv_possible) * 2 + sum(np.array(self.n_lv_possible)-1) * mf_size
else:
fourth_pointer = 2 * self.nAnts * self.nRules
if self.ds_mode == 2:
fifth_pointer = fourth_pointer + self.nRules
else:
fifth_pointer = fourth_pointer
aux_pointer = 0
# If we optimize membership functions - decode them
if self.lvs is None:
# Convert to numpy for decoding (membership decoding is complex, keep in numpy)
x_np = x.cpu().numpy() if isinstance(x, torch.Tensor) else x
antecedents = self._decode_membership_functions(x_np, fuzzy_type)
# Compute memberships and convert to torch
precomputed_antecedent_memberships_np = rules.compute_antecedents_memberships(antecedents, self.X)
# Convert to torch tensors
precomputed_antecedent_memberships = [
torch.from_numpy(ant_mems).float().to(device)
for ant_mems in precomputed_antecedent_memberships_np
]
else:
if not hasattr(self, '_precomputed_truth_torch') or self._precomputed_truth_torch is None:
# Convert precomputed truth to torch once
self._precomputed_truth_torch = [
torch.from_numpy(ant_mems).float().to(device)
for ant_mems in self._precomputed_truth
]
precomputed_antecedent_memberships = self._precomputed_truth_torch
# Ensure x is integer type
x = x.long()
# VECTORIZED RULE RECONSTRUCTION - extract all at once using slicing
chosen_ants_all = x[:self.nAnts * self.nRules].reshape(self.nRules, self.nAnts)
ant_params_all = x[self.nAnts * self.nRules:2 * self.nAnts * self.nRules].reshape(self.nRules, self.nAnts)
# Clamp parameters to valid range (vectorized)
if self.lvs is not None:
for ant_idx in range(self.X.shape[1]):
mask = chosen_ants_all == ant_idx
ant_params_all = torch.where(mask, torch.clamp(ant_params_all, max=len(self.lvs[ant_idx]) - 1), ant_params_all)
else:
for ant_idx in range(self.X.shape[1]):
mask = chosen_ants_all == ant_idx
ant_params_all = torch.where(mask, torch.clamp(ant_params_all, max=self.n_lv_possible[ant_idx] - 1), ant_params_all)
# Initialize with ones and multiply memberships
rule_memberships = torch.ones((self.X.shape[0], self.nRules), device=device)
for rule_idx in range(self.nRules):
for ant_idx in range(self.nAnts):
feature = chosen_ants_all[rule_idx, ant_idx].item()
param = ant_params_all[rule_idx, ant_idx].item()
rule_memberships[:, rule_idx] *= precomputed_antecedent_memberships[feature][param, :]
# Get consequents (vectorized)
rule_class_consequents = x[fourth_pointer:fourth_pointer + self.nRules]
# Apply weights if needed (vectorized)
if self.ds_mode == 2:
rule_weights = x[fifth_pointer:fifth_pointer + self.nRules].float() / 100.0
rule_memberships = rule_memberships * rule_weights.unsqueeze(0)
# One-hot encoding and matrix multiplication
rule_onehot = torch.nn.functional.one_hot(rule_class_consequents, num_classes=self.n_classes).float()
memberships_pred = rule_memberships @ rule_onehot # (n_samples, nRules) @ (nRules, n_classes)
predicted_classes = torch.argmax(memberships_pred, dim=1)
if return_tensor:
# Return predictions directly as tensor (for EvoX backend)
return predicted_classes, y_torch
else:
# Compute MCC using sklearn (convert back to numpy)
y_pred_np = predicted_classes.cpu().numpy()
y_true_np = y_torch.cpu().numpy()
return matthews_corrcoef(y_true_np, y_pred_np)
def _evaluate_torch_batch(self, population, y, fuzzy_type: fs.FUZZY_SETS, device='cuda', batch_size=None, **kwargs):
'''
Memory-efficient batch evaluation - processes population in smaller batches.
Automatically determines optimal batch size based on available memory if not specified.
Uses a simpler loop-based approach to avoid creating massive 6D tensors.
:param population: population tensor (pop_size, n_var)
:param y: labels tensor (can be numpy array or torch tensor)
:param fuzzy_type: enum type for fuzzy set type
:param device: device to run computation on ('cuda' or 'cpu')
:param batch_size: number of individuals to process at once (if None, auto-computed)
:return: MCC scores for entire population (pop_size,)
'''
try:
import torch
except ImportError:
raise ImportError("PyTorch is required for batched evaluation. Install with: pip install torch")
# Auto-compute batch size if not provided
if batch_size is None:
batch_size = self._compute_optimal_batch_size(population.shape[0], device)
# Convert y to torch if needed
if not isinstance(y, torch.Tensor):
y_torch = torch.from_numpy(y).long().to(device)
else:
y_torch = y.long().to(device)
# Ensure population is on device
if not isinstance(population, torch.Tensor):
population = torch.from_numpy(population).to(device)
else:
population = population.to(device)
pop_size = population.shape[0]
n_samples = self.X.shape[0]
n_features = self.X.shape[1]
# Cache precomputed memberships as list of tensors
if not hasattr(self, '_precomputed_truth_torch') or self._precomputed_truth_torch is None:
if self.lvs is not None:
self._precomputed_truth_torch = [
torch.from_numpy(self._precomputed_truth[feat_idx]).float().to(device)
for feat_idx in range(n_features)
]
precomputed_memberships = self._precomputed_truth_torch
# Calculate pointers
mf_size = 4 if fuzzy_type == fs.FUZZY_SETS.t1 else 6
if self.lvs is None:
if fuzzy_type == fs.FUZZY_SETS.t1:
fourth_pointer = 2 * self.nAnts * self.nRules + \
len(self.n_lv_possible) * 3 + sum(np.array(self.n_lv_possible)-1) * 4
elif fuzzy_type == fs.FUZZY_SETS.t2:
fourth_pointer = 2 * self.nAnts * self.nRules + \
len(self.n_lv_possible) * 2 + sum(np.array(self.n_lv_possible)-1) * mf_size
else:
fourth_pointer = 2 * self.nAnts * self.nRules
if self.ds_mode == 2:
fifth_pointer = fourth_pointer + self.nRules
# Process population in batches
all_mcc_scores = []
for batch_start in range(0, pop_size, batch_size):
batch_end = min(batch_start + batch_size, pop_size)
batch_pop = population[batch_start:batch_end]
batch_pop_size = batch_pop.shape[0]
# Extract gene segments for this batch
chosen_ants_batch = batch_pop[:, :self.nAnts * self.nRules].reshape(batch_pop_size, self.nRules, self.nAnts).long()
ant_params_batch = batch_pop[:, self.nAnts * self.nRules:2 * self.nAnts * self.nRules].reshape(batch_pop_size, self.nRules, self.nAnts).long()
# Clamp parameters to valid ranges
for feat_idx in range(n_features):
mask = chosen_ants_batch == feat_idx
max_param = self.n_lv_possible[feat_idx] - 1
ant_params_batch = torch.where(mask, torch.clamp(ant_params_batch, max=max_param), ant_params_batch)
# Compute rule memberships for this batch
# Shape: (batch_pop_size, n_samples, nRules)
rule_memberships_batch = torch.ones((batch_pop_size, n_samples, self.nRules), device=device)
# For each individual in batch, each rule, and each antecedent
for ind_idx in range(batch_pop_size):
for rule_idx in range(self.nRules):
for ant_idx in range(self.nAnts):
feature = chosen_ants_batch[ind_idx, rule_idx, ant_idx].item()
param = ant_params_batch[ind_idx, rule_idx, ant_idx].item()
# Multiply membership values: (n_samples,)
rule_memberships_batch[ind_idx, :, rule_idx] *= precomputed_memberships[feature][param, :]
# Get consequents for this batch: (batch_pop_size, nRules)
rule_consequents_batch = batch_pop[:, fourth_pointer:fourth_pointer + self.nRules].long()
# Apply weights if needed
if self.ds_mode == 2:
rule_weights_batch = batch_pop[:, fifth_pointer:fifth_pointer + self.nRules].float() / 100.0
rule_memberships_batch = rule_memberships_batch * rule_weights_batch[:, None, :]
# Handle unknown classes (-1 -> n_classes)
rule_consequents_batch = torch.where(rule_consequents_batch == -1, self.n_classes, rule_consequents_batch)
# Compute predictions for this batch
# One-hot encode consequents: (batch_pop_size, nRules, n_classes+1)
rule_onehot_batch = torch.nn.functional.one_hot(rule_consequents_batch, num_classes=self.n_classes + 1).float()
# Aggregate memberships by class: (batch_pop_size, n_samples, n_classes+1)
memberships_pred_batch = torch.einsum('bsr,brc->bsc', rule_memberships_batch, rule_onehot_batch)
# Get predicted classes: (batch_pop_size, n_samples)
predicted_classes_batch = torch.argmax(memberships_pred_batch, dim=2)
# Compute MCC for each individual in batch
for ind_idx in range(batch_pop_size):
mcc = self._compute_mcc_torch_fast(predicted_classes_batch[ind_idx], y_torch)
all_mcc_scores.append(mcc)
return torch.tensor(all_mcc_scores, dtype=torch.float32, device=device)
def _compute_optimal_batch_size(self, pop_size, device='cuda'):
'''
Automatically compute optimal batch size based on available memory and problem size.
:param pop_size: population size
:param device: device ('cuda' or 'cpu')
:return: optimal batch size
'''
try:
import torch
except ImportError:
return 10 # Default fallback
n_samples = self.X.shape[0]
# Estimate memory per individual in batch
# Main tensors: rule_memberships_batch (batch_size, n_samples, nRules)
# Additional overhead for intermediate computations
bytes_per_individual = n_samples * self.nRules * 4 # 4 bytes per float32
bytes_per_individual *= 3 # Account for intermediate tensors and overhead
if device == 'cuda' and torch.cuda.is_available():
try:
# Get available GPU memory (leave 20% buffer for safety)
gpu_props = torch.cuda.get_device_properties(0)
total_memory = gpu_props.total_memory
allocated_memory = torch.cuda.memory_allocated(0)
reserved_memory = torch.cuda.memory_reserved(0)
available_memory = total_memory - max(allocated_memory, reserved_memory)
# Use 60% of available memory for batching (conservative)
usable_memory = available_memory * 0.6
# Calculate batch size
estimated_batch_size = int(usable_memory / bytes_per_individual)
# Clamp to reasonable range [1, pop_size]
batch_size = max(1, min(estimated_batch_size, pop_size))
# Round down to nice numbers for better memory alignment
if batch_size > 20:
batch_size = (batch_size // 10) * 10
elif batch_size > 5:
batch_size = (batch_size // 5) * 5
except Exception as e:
# If memory query fails, use conservative default
batch_size = min(10, pop_size)
else:
# For CPU, use more aggressive batching since RAM is usually more plentiful
# But also consider that CPU is slower, so moderate batch sizes work well
try:
import psutil
available_ram = psutil.virtual_memory().available
usable_ram = available_ram * 0.4 # Use 40% of available RAM
estimated_batch_size = int(usable_ram / bytes_per_individual)
batch_size = max(5, min(estimated_batch_size, pop_size))
# Round to nice numbers
if batch_size > 20:
batch_size = (batch_size // 10) * 10
elif batch_size > 10:
batch_size = (batch_size // 5) * 5
except ImportError:
# psutil not available, use conservative default
batch_size = min(15, pop_size)
return batch_size
def _compute_mcc_torch_fast(self, y_pred, y_true):
'''Fast GPU-based MCC computation using confusion matrix.'''
import torch
y_pred = y_pred.long()
y_true = y_true.long()
classes = torch.unique(torch.cat([y_true, y_pred]))
n_classes = len(classes)
if n_classes == 1:
return 0.0
# Compute confusion matrix
confusion = torch.zeros((n_classes, n_classes), dtype=torch.float32, device=y_pred.device)
for i, c in enumerate(classes):
for j, k in enumerate(classes):
confusion[i, j] = torch.sum((y_true == c) & (y_pred == k)).float()
t_sum = confusion.sum()
pred_sum = confusion.sum(dim=0)
true_sum = confusion.sum(dim=1)
diag_sum = torch.trace(confusion)
cov_ytyp = diag_sum * t_sum - torch.sum(pred_sum * true_sum)
cov_ypyp = t_sum * t_sum - torch.sum(pred_sum * pred_sum)
cov_ytyt = t_sum * t_sum - torch.sum(true_sum * true_sum)
if cov_ypyp == 0 or cov_ytyt == 0:
return 0.0
mcc = cov_ytyp / torch.sqrt(cov_ypyp * cov_ytyt)
return float(mcc.item())
def _evaluate_slow(self, x: np.array, out: dict, *args, **kwargs):
'''
:param x: array of train samples. x shape = features
those features are the parameters to optimize.
:param out: dict where the F field is the fitness. It is used from the outside.
'''
ruleBase = self._construct_ruleBase(x, self.fuzzy_type)
if len(ruleBase.get_rules()) > 0:
score = self.fitness_func(ruleBase, self.X, self.y, self.tolerance, self.alpha_, self.beta_, self._precomputed_truth)
else:
score = 0.0
out["F"] = 1 - score
def _evaluate(self, x: np.array, out: dict, *args, **kwargs):
'''
Faster version of the evaluate function, which does not reconstruct the rule base each time. It computes a functional equivalent with numpy operations,
which saves considerable time.
:param x: array of train samples. x shape = features
those features are the parameters to optimize.
:param out: dict where the F field is the fitness. It is used from the outside.
'''
score = self._evaluate_numpy_fast(x, self.y, self.fuzzy_type)
out["F"] = 1 - score
[docs]
def fitness_func(self, ruleBase: rules.RuleBase, X:np.array, y:np.array, tolerance:float, alpha:float=0.0, beta:float=0.0, precomputed_truth:np.array=None) -> float:
'''
Fitness function for the optimization problem.
:param ruleBase: RuleBase object
:param X: array of train samples. X shape = (n_samples, n_features)
:param y: array of train labels. y shape = (n_samples,)
:param tolerance: float. Tolerance for the size evaluation.
:param alpha: float. Weight for the accuracy term.
:param beta: float. Weight for the average rule size term.
:param precomputed_truth: np array. If given, it will be used as the truth values for the evaluation.
:return: float. Fitness value.
'''
if precomputed_truth is None:
precomputed_truth = rules.compute_antecedents_memberships(ruleBase.antecedents, X)
ev_object = evr.evalRuleBase(ruleBase, X, y, precomputed_truth=precomputed_truth)
ev_object.add_full_evaluation()
ruleBase.purge_rules(tolerance)
if len(ruleBase.get_rules()) > 0:
score_acc = ev_object.classification_eval()
score_rules_size = ev_object.size_antecedents_eval(tolerance)
score_nrules = ev_object.effective_rulesize_eval(tolerance)
score = score_acc + score_rules_size * alpha + score_nrules * beta
else:
score = 0.0
return score