Source code for ex_fuzzy.conformal
"""
Conformal Prediction for Fuzzy Rule-Based Classifiers
Provides prediction sets with coverage guarantees using split conformal prediction.
Supports both class-level and rule-level conformal prediction for explainable uncertainty.
Main Components:
- ConformalFuzzyClassifier: Main classifier with conformal prediction
- evaluate_conformal_coverage: Evaluation utility for coverage metrics
Key Features:
- Wrap existing trained classifiers or create from scratch
- Class-level conformal prediction (standard)
- Rule-wise conformal prediction (explainable uncertainty)
- Multiple nonconformity score strategies
Example:
>>> from ex_fuzzy.conformal import ConformalFuzzyClassifier
>>> # Wrap existing classifier
>>> conf_clf = ConformalFuzzyClassifier(trained_clf)
>>> conf_clf.calibrate(X_cal, y_cal)
>>> pred_sets = conf_clf.predict_set(X_test, alpha=0.1)
>>> # Or create from scratch
>>> conf_clf = ConformalFuzzyClassifier(nRules=20, nAnts=4)
>>> conf_clf.fit(X_train, y_train, X_cal, y_cal, n_gen=50)
"""
import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin
from typing import Union, Optional, List, Dict
try:
from . import fuzzy_sets as fs
from . import evolutionary_fit as evf
from . import rules
except ImportError:
import fuzzy_sets as fs
import evolutionary_fit as evf
import rules
[docs]
class ConformalFuzzyClassifier(ClassifierMixin, BaseEstimator):
"""
Conformal prediction wrapper for fuzzy rule-based classifiers.
Can wrap an existing BaseFuzzyRulesClassifier or create one internally
using the same parameter signature. Provides prediction sets with
statistical coverage guarantees.
Parameters
----------
clf_or_nRules : BaseFuzzyRulesClassifier or int, optional
Either a trained classifier to wrap, or nRules parameter to create new one.
If None, uses default nRules=30.
score_type : str, default='membership'
Nonconformity score type:
- 'membership': 1 - membership degree of true class (default)
- 'association': 1 - max association degree for rules of true class
- 'entropy': Entropy of class probability distribution
**kwargs : dict
Additional parameters passed to BaseFuzzyRulesClassifier if creating new.
Common parameters include: nAnts, fuzzy_type, n_linguistic_variables,
tolerance, verbose, backend, etc.
Attributes
----------
clf : BaseFuzzyRulesClassifier
The underlying fuzzy classifier
score_type : str
The type of nonconformity score being used
rule_base : MasterRuleBase
The rule base from the underlying classifier (property)
nclasses_ : int
Number of classes (property)
Examples
--------
Wrap an existing trained classifier:
>>> from ex_fuzzy.evolutionary_fit import BaseFuzzyRulesClassifier
>>> from ex_fuzzy.conformal import ConformalFuzzyClassifier
>>>
>>> clf = BaseFuzzyRulesClassifier(nRules=20)
>>> clf.fit(X_train, y_train, n_gen=50)
>>>
>>> conf_clf = ConformalFuzzyClassifier(clf)
>>> conf_clf.calibrate(X_cal, y_cal)
>>> pred_sets = conf_clf.predict_set(X_test, alpha=0.1)
Create classifier from scratch:
>>> conf_clf = ConformalFuzzyClassifier(nRules=20, nAnts=4)
>>> conf_clf.fit(X_train, y_train, X_cal, y_cal, n_gen=50)
>>> pred_sets = conf_clf.predict_set(X_test, alpha=0.1)
Get rule-wise explanations:
>>> results = conf_clf.predict_set_with_rules(X_test, alpha=0.1)
>>> for r in results:
... print(f"Prediction set: {r['prediction_set']}")
... print(f"Rule contributions: {r['rule_contributions']}")
"""
[docs]
def __init__(self, clf_or_nRules: Union[evf.BaseFuzzyRulesClassifier, int, None] = None,
score_type: str = 'membership',
**kwargs):
if isinstance(clf_or_nRules, evf.BaseFuzzyRulesClassifier):
# Wrapper mode - use existing classifier
self.clf = clf_or_nRules
self._owns_clf = False
else:
# Creation mode - create new classifier
nRules = clf_or_nRules if clf_or_nRules is not None else kwargs.pop('nRules', 30)
self.clf = evf.BaseFuzzyRulesClassifier(nRules=nRules, **kwargs)
self._owns_clf = True
if score_type not in ('membership', 'association', 'entropy'):
raise ValueError(f"Unknown score_type: {score_type}. "
f"Expected one of: 'membership', 'association', 'entropy'")
self.score_type = score_type
self._calibrated = False
self._calibration_scores = None
self._thresholds = None # Per-class thresholds
self._rule_calibration = None # Per-rule thresholds (for rule-wise)
[docs]
def fit(self, X: np.ndarray, y: np.ndarray,
X_cal: np.ndarray = None, y_cal: np.ndarray = None,
cal_size: float = 0.2,
**fit_kwargs) -> 'ConformalFuzzyClassifier':
"""
Fit the classifier and calibrate for conformal prediction.
If X_cal/y_cal not provided, automatically splits X/y using cal_size.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data
y : array-like of shape (n_samples,)
Training labels
X_cal : array-like of shape (n_cal_samples, n_features), optional
Calibration data. If None, split from X using cal_size.
y_cal : array-like of shape (n_cal_samples,), optional
Calibration labels. If None, split from y using cal_size.
cal_size : float, default=0.2
Fraction of data to use for calibration if X_cal not provided
**fit_kwargs : dict
Parameters passed to clf.fit() (n_gen, pop_size, etc.)
Returns
-------
self : ConformalFuzzyClassifier
The fitted conformal classifier
Examples
--------
>>> conf_clf = ConformalFuzzyClassifier(nRules=20)
>>> # Auto-split for calibration
>>> conf_clf.fit(X, y, cal_size=0.2, n_gen=50)
>>> # Or provide explicit calibration set
>>> conf_clf.fit(X_train, y_train, X_cal, y_cal, n_gen=50)
"""
X = np.asarray(X)
y = np.asarray(y)
if X_cal is None:
# Split data for calibration
X, X_cal, y, y_cal = self._split_calibration(X, y, cal_size)
else:
X_cal = np.asarray(X_cal)
y_cal = np.asarray(y_cal)
if self._owns_clf:
self.clf.fit(X, y, **fit_kwargs)
self.calibrate(X_cal, y_cal)
return self
[docs]
def calibrate(self, X_cal: np.ndarray, y_cal: np.ndarray) -> 'ConformalFuzzyClassifier':
"""
Calibrate conformal prediction thresholds using calibration set.
Computes nonconformity scores on the calibration set and stores
them for computing p-values during prediction.
Parameters
----------
X_cal : array-like of shape (n_cal_samples, n_features)
Calibration samples
y_cal : array-like of shape (n_cal_samples,)
Calibration labels
Returns
-------
self : ConformalFuzzyClassifier
The calibrated conformal classifier
Raises
------
ValueError
If the underlying classifier has not been fitted yet
Examples
--------
>>> # After fitting the base classifier separately
>>> conf_clf = ConformalFuzzyClassifier(trained_clf)
>>> conf_clf.calibrate(X_cal, y_cal)
"""
X_cal = np.asarray(X_cal)
y_cal = np.asarray(y_cal)
# Check that the classifier is fitted
if not hasattr(self.clf, 'rule_base') or self.clf.rule_base is None:
raise ValueError("Classifier not fitted. Either wrap a trained classifier "
"or use fit() instead of calibrate().")
# Compute nonconformity scores for calibration samples
scores = self._compute_nonconformity_scores(X_cal, y_cal)
# Store scores grouped by true class
self._calibration_scores = {}
unique_classes = np.unique(y_cal)
for class_idx in unique_classes:
mask = y_cal == class_idx
self._calibration_scores[int(class_idx)] = scores[mask]
# Compute rule-wise calibration if applicable
self._calibrate_rules(X_cal, y_cal)
self._calibrated = True
return self
[docs]
def predict(self, X: np.ndarray) -> np.ndarray:
"""
Standard point prediction (delegates to wrapped classifier).
Parameters
----------
X : array-like of shape (n_samples, n_features)
Test samples
Returns
-------
predictions : ndarray of shape (n_samples,)
Predicted class labels
"""
return self.clf.predict(X)
[docs]
def predict_set(self, X: np.ndarray, alpha: float = 0.1) -> List[set]:
"""
Predict conformal sets with coverage guarantee 1-alpha.
Returns prediction sets that, under exchangeability assumptions,
contain the true label with probability at least 1-alpha.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Test samples
alpha : float, default=0.1
Significance level. The coverage guarantee is 1-alpha.
For example, alpha=0.1 gives 90% coverage.
Returns
-------
prediction_sets : list of sets
For each sample, a set of class indices in the prediction set.
Empty sets indicate high uncertainty for all classes.
Sets with multiple classes indicate ambiguous predictions.
Raises
------
ValueError
If the classifier is not calibrated
Examples
--------
>>> pred_sets = conf_clf.predict_set(X_test, alpha=0.1)
>>> for i, pred_set in enumerate(pred_sets):
... if len(pred_set) == 1:
... print(f"Sample {i}: confident prediction {pred_set}")
... elif len(pred_set) > 1:
... print(f"Sample {i}: ambiguous between {pred_set}")
... else:
... print(f"Sample {i}: high uncertainty (empty set)")
"""
if not self._calibrated:
raise ValueError("Classifier not calibrated. Call calibrate() or fit() first.")
X = np.asarray(X)
n_samples = X.shape[0]
n_classes = self.clf.nclasses_
# Get soft scores for all classes
class_scores = self.clf.predict_membership_class(X) # (n_samples, n_classes)
prediction_sets = []
for i in range(n_samples):
pred_set = set()
for c in range(n_classes):
# Compute p-value for class c
score = 1 - class_scores[i, c] # Nonconformity: 1 - membership
p_value = self._compute_p_value(score, c)
if p_value > alpha:
pred_set.add(c)
prediction_sets.append(pred_set)
return prediction_sets
[docs]
def predict_set_with_rules(self, X: np.ndarray, alpha: float = 0.1) -> List[Dict]:
"""
Predict conformal sets with rule-level explanations.
Returns prediction sets along with which rules contribute
and their individual confidence levels. This provides
explainable uncertainty quantification.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Test samples
alpha : float, default=0.1
Significance level (0.1 = 90% coverage)
Returns
-------
results : list of dicts
Each result dict includes keys:
'prediction_set' (set of class indices),
'rule_contributions' (list of per-rule contribution dicts with
'rule_index', 'class', 'firing_strength', and 'rule_confidence'),
and 'class_p_values' (dict mapping class index to p-value).
Raises
------
ValueError
If the classifier is not calibrated
Examples
--------
>>> results = conf_clf.predict_set_with_rules(X_test[:5], alpha=0.1)
>>> for i, r in enumerate(results):
... print(f"Sample {i}:")
... print(f" Prediction set: {r['prediction_set']}")
... print(f" P-values: {r['class_p_values']}")
... for contrib in r['rule_contributions'][:3]:
... print(f" Rule {contrib['rule_index']}: "
... f"class={contrib['class']}, "
... f"strength={contrib['firing_strength']:.3f}")
"""
if not self._calibrated:
raise ValueError("Classifier not calibrated. Call calibrate() or fit() first.")
X = np.asarray(X)
# Get per-rule association degrees
rule_scores = self.clf.predict_proba_rules(X, truth_degrees=False)
rule_consequents = self.clf.rule_base.get_consequents()
n_samples = X.shape[0]
results = []
for i in range(n_samples):
result = {
'prediction_set': set(),
'rule_contributions': [],
'class_p_values': {}
}
# Compute p-values per class
class_scores = self.clf.predict_membership_class(X[i:i+1])[0]
for c in range(self.clf.nclasses_):
score = 1 - class_scores[c]
p_value = self._compute_p_value(score, c)
result['class_p_values'][c] = p_value
if p_value > alpha:
result['prediction_set'].add(c)
# Get rule contributions for classes in prediction set
for rule_idx, consequent in enumerate(rule_consequents):
if consequent in result['prediction_set']:
rule_score = rule_scores[i, rule_idx]
if rule_score > 0:
result['rule_contributions'].append({
'rule_index': rule_idx,
'class': consequent,
'firing_strength': float(rule_score),
'rule_confidence': self._get_rule_confidence(rule_idx, rule_score)
})
# Sort by firing strength (highest first)
result['rule_contributions'].sort(key=lambda x: x['firing_strength'], reverse=True)
results.append(result)
return results
def _compute_nonconformity_scores(self, X: np.ndarray, y: np.ndarray) -> np.ndarray:
"""Compute nonconformity scores based on score_type."""
if self.score_type == 'membership':
# 1 - membership degree of true class
class_memberships = self.clf.predict_membership_class(X)
scores = 1 - class_memberships[np.arange(len(y)), y.astype(int)]
elif self.score_type == 'association':
# 1 - max association degree for rules of true class
rule_scores = self.clf.predict_proba_rules(X, truth_degrees=False)
consequents = self.clf.rule_base.get_consequents()
scores = np.zeros(len(y))
for i, true_class in enumerate(y):
class_rules = [j for j, c in enumerate(consequents) if c == int(true_class)]
if class_rules:
scores[i] = 1 - np.max(rule_scores[i, class_rules])
else:
scores[i] = 1.0
elif self.score_type == 'entropy':
# Entropy of class probability distribution
probs = self.clf.predict_proba(X)
probs = np.clip(probs, 1e-10, 1) # Avoid log(0)
scores = -np.sum(probs * np.log(probs), axis=1)
# Normalize by max entropy for comparability
max_entropy = np.log(self.clf.nclasses_)
scores = scores / max_entropy if max_entropy > 0 else scores
else:
raise ValueError(f"Unknown score_type: {self.score_type}")
return scores
def _compute_p_value(self, score: float, class_idx: int) -> float:
"""
Compute conformal p-value for a given score and class.
Uses the formula: (#{s_i >= s} + 1) / (n + 1)
where s_i are calibration scores and s is the test score.
"""
if class_idx not in self._calibration_scores:
return 0.0
cal_scores = self._calibration_scores[class_idx]
if len(cal_scores) == 0:
return 0.0
# P-value = proportion of calibration scores >= test score
# Adding +1 to numerator and denominator for finite sample correction
return (np.sum(cal_scores >= score) + 1) / (len(cal_scores) + 1)
def _calibrate_rules(self, X_cal: np.ndarray, y_cal: np.ndarray):
"""Calibrate per-rule thresholds for rule-wise conformal prediction."""
rule_scores = self.clf.predict_proba_rules(X_cal, truth_degrees=False)
consequents = self.clf.rule_base.get_consequents()
n_rules = len(consequents)
self._rule_calibration = {}
for rule_idx in range(n_rules):
rule_class = consequents[rule_idx]
# Get scores for samples where this rule's class is correct
correct_mask = y_cal == rule_class
if np.sum(correct_mask) > 0:
self._rule_calibration[rule_idx] = rule_scores[correct_mask, rule_idx]
def _get_rule_confidence(self, rule_idx: int, score: float) -> float:
"""Get confidence for a rule firing at given strength."""
if self._rule_calibration is None or rule_idx not in self._rule_calibration:
return 0.0
cal_scores = self._rule_calibration[rule_idx]
if len(cal_scores) == 0:
return 0.0
# Confidence = proportion of calibration scores <= current score
return float(np.sum(cal_scores <= score) / len(cal_scores))
def _split_calibration(self, X, y, cal_size):
"""Split data into training and calibration sets (stratified)."""
from sklearn.model_selection import train_test_split
return train_test_split(X, y, test_size=cal_size, stratify=y, random_state=42)
[docs]
def get_calibration_info(self) -> Dict:
"""
Return calibration statistics for inspection.
Returns
-------
info : dict
Dictionary containing:
- 'n_calibration_samples': total calibration samples
- 'samples_per_class': dict of samples per class
- 'score_type': the nonconformity score type used
- 'n_rules_calibrated': number of rules with calibration data
Examples
--------
>>> info = conf_clf.get_calibration_info()
>>> print(f"Calibrated with {info['n_calibration_samples']} samples")
>>> print(f"Score type: {info['score_type']}")
"""
if not self._calibrated:
return {}
return {
'n_calibration_samples': sum(len(v) for v in self._calibration_scores.values()),
'samples_per_class': {k: len(v) for k, v in self._calibration_scores.items()},
'score_type': self.score_type,
'n_rules_calibrated': len(self._rule_calibration) if self._rule_calibration else 0
}
# Delegate common properties to wrapped classifier
@property
def rule_base(self):
"""The underlying rule base from the fuzzy classifier."""
return self.clf.rule_base
@property
def nclasses_(self):
"""Number of classes."""
return self.clf.nclasses_
[docs]
def print_rules(self, return_rules: bool = False, bootstrap_results: bool = False):
"""Print or return the rules from the underlying classifier."""
return self.clf.print_rules(return_rules=return_rules,
bootstrap_results=bootstrap_results)
[docs]
def evaluate_conformal_coverage(conf_clf: ConformalFuzzyClassifier,
X_test: np.ndarray, y_test: np.ndarray,
alpha: float = 0.1) -> Dict:
"""
Evaluate conformal prediction coverage and efficiency.
Computes metrics to assess the quality of conformal predictions,
including whether the coverage guarantee is satisfied and how
informative the prediction sets are.
Parameters
----------
conf_clf : ConformalFuzzyClassifier
A calibrated conformal classifier
X_test : array-like of shape (n_samples, n_features)
Test samples
y_test : array-like of shape (n_samples,)
True labels for test samples
alpha : float, default=0.1
Significance level used for prediction sets
Returns
-------
metrics : dict
Dictionary containing:
- 'coverage': Empirical coverage (should be >= 1-alpha)
- 'expected_coverage': The target coverage (1-alpha)
- 'avg_set_size': Average prediction set size
- 'efficiency': 1 / avg_set_size (higher is better)
- 'empty_sets': Proportion of empty prediction sets
- 'singleton_sets': Proportion of single-class sets
- 'coverage_by_class': Per-class coverage rates
Examples
--------
>>> metrics = evaluate_conformal_coverage(conf_clf, X_test, y_test, alpha=0.1)
>>> print(f"Coverage: {metrics['coverage']:.3f} (expected: {metrics['expected_coverage']:.3f})")
>>> print(f"Average set size: {metrics['avg_set_size']:.2f}")
>>> print(f"Singleton rate: {metrics['singleton_sets']:.2%}")
>>> # Check coverage guarantee
>>> if metrics['coverage'] >= metrics['expected_coverage'] - 0.05:
... print("Coverage guarantee approximately satisfied")
"""
X_test = np.asarray(X_test)
y_test = np.asarray(y_test)
pred_sets = conf_clf.predict_set(X_test, alpha=alpha)
# Coverage: proportion where true class is in prediction set
coverage = np.mean([y_test[i] in pred_sets[i] for i in range(len(y_test))])
# Average set size
set_sizes = [len(s) for s in pred_sets]
avg_size = np.mean(set_sizes)
# Efficiency metrics
empty = np.mean([len(s) == 0 for s in pred_sets])
singleton = np.mean([len(s) == 1 for s in pred_sets])
# Per-class coverage
coverage_by_class = {}
for c in np.unique(y_test):
mask = y_test == c
class_coverage = np.mean([y_test[i] in pred_sets[i] for i in np.where(mask)[0]])
coverage_by_class[int(c)] = class_coverage
return {
'coverage': coverage,
'expected_coverage': 1 - alpha,
'avg_set_size': avg_size,
'efficiency': 1 / avg_size if avg_size > 0 else 0,
'empty_sets': empty,
'singleton_sets': singleton,
'coverage_by_class': coverage_by_class
}