from itertools import product from tqdm import tqdm from typing import Union, Callable, Counter import numpy as np from sklearn.feature_extraction.text import CountVectorizer from sklearn.utils import resample from sklearn.preprocessing import normalize from method.confidence import WithConfidenceABC, ConfidenceRegionABC from quapy.functional import get_divergence from quapy.method.base import BaseQuantifier, BinaryQuantifier import quapy.functional as F from scipy.optimize import lsq_linear from scipy import sparse class MaximumLikelihoodPrevalenceEstimation(BaseQuantifier): """ The `Maximum Likelihood Prevalence Estimation` (MLPE) method is a lazy method that assumes there is no prior probability shift between training and test instances (put it other way, that the i.i.d. assumpion holds). The estimation of class prevalence values for any test sample is always (i.e., irrespective of the test sample itself) the class prevalence seen during training. This method is considered to be a lower-bound quantifier that any quantification method should beat. """ def __init__(self): self._classes_ = None def fit(self, X, y): """ Computes the training prevalence and stores it. :param X: array-like of shape `(n_samples, n_features)`, the training instances :param y: array-like of shape `(n_samples,)`, the labels :return: self """ self._classes_ = F.classes_from_labels(labels=y) self.estimated_prevalence = F.prevalence_from_labels(y, classes=self._classes_) return self def predict(self, X): """ Ignores the input instances and returns, as the class prevalence estimantes, the training prevalence. :param X: array-like (ignored) :return: the class prevalence seen during training """ return self.estimated_prevalence class DMx(BaseQuantifier): """ Generic Distribution Matching quantifier for binary or multiclass quantification based on the space of covariates. This implementation takes the number of bins, the divergence, and the possibility to work on CDF as hyperparameters. :param nbins: number of bins used to discretize the distributions (default 8) :param divergence: a string representing a divergence measure (currently, "HD" and "topsoe" are implemented) or a callable function taking two ndarrays of the same dimension as input (default "HD", meaning Hellinger Distance) :param cdf: whether to use CDF instead of PDF (default False) :param n_jobs: number of parallel workers (default None) """ def __init__(self, nbins=8, divergence: Union[str, Callable]='HD', cdf=False, search='optim_minimize', n_jobs=None): self.nbins = nbins self.divergence = divergence self.cdf = cdf self.search = search self.n_jobs = n_jobs @classmethod def HDx(cls, n_jobs=None): """ `Hellinger Distance x `_ (HDx). HDx is a method for training binary quantifiers, that models quantification as the problem of minimizing the average divergence (in terms of the Hellinger Distance) across the feature-specific normalized histograms of two representations, one for the unlabelled examples, and another generated from the training examples as a mixture model of the class-specific representations. The parameters of the mixture thus represent the estimates of the class prevalence values. The method computes all matchings for nbins in [10, 20, ..., 110] and reports the mean of the median. The best prevalence is searched via linear search, from 0 to 1 stepping by 0.01. :param n_jobs: number of parallel workers :return: an instance of this class setup to mimick the performance of the HDx as originally proposed by González-Castro, Alaiz-Rodríguez, Alegre (2013) """ from quapy.method.meta import MedianEstimator dmx = DMx(divergence='HD', cdf=False, search='linear_search') nbins = {'nbins': np.linspace(10, 110, 11, dtype=int)} hdx = MedianEstimator(base_quantifier=dmx, param_grid=nbins, n_jobs=n_jobs) return hdx def __get_distributions(self, X): histograms = [] for feat_idx in range(self.nfeats): feature = X[:, feat_idx] feat_range = self.feat_ranges[feat_idx] hist = np.histogram(feature, bins=self.nbins, range=feat_range)[0] norm_hist = hist / hist.sum() histograms.append(norm_hist) distributions = np.vstack(histograms) if self.cdf: distributions = np.cumsum(distributions, axis=1) return distributions def fit(self, X, y): """ Generates the validation distributions out of the training data (covariates). The validation distributions have shape `(n, nfeats, nbins)`, with `n` the number of classes, `nfeats` the number of features, and `nbins` the number of bins. In particular, let `V` be the validation distributions; then `di=V[i]` are the distributions obtained from training data labelled with class `i`; while `dij = di[j]` is the discrete distribution for feature j in training data labelled with class `i`, and `dij[k]` is the fraction of instances with a value in the `k`-th bin. :param X: array-like of shape `(n_samples, n_features)`, the training instances :param y: array-like of shape `(n_samples,)`, the labels """ self.nfeats = X.shape[1] self.feat_ranges = _get_features_range(X) n_classes = len(np.unique(y)) self.validation_distribution = np.asarray( [self.__get_distributions(X[y==cat]) for cat in range(n_classes)] ) return self def predict(self, X): """ Searches for the mixture model parameter (the sought prevalence values) that yields a validation distribution (the mixture) that best matches the test distribution, in terms of the divergence measure of choice. The matching is computed as the average dissimilarity (in terms of the dissimilarity measure of choice) between all feature-specific discrete distributions. :param X: instances in the sample :return: a vector of class prevalence estimates """ assert X.shape[1] == self.nfeats, f'wrong shape; expected {self.nfeats}, found {X.shape[1]}' test_distribution = self.__get_distributions(X) divergence = get_divergence(self.divergence) n_classes, n_feats, nbins = self.validation_distribution.shape def loss(prev): prev = np.expand_dims(prev, axis=0) mixture_distribution = (prev @ self.validation_distribution.reshape(n_classes,-1)).reshape(n_feats, -1) divs = [divergence(test_distribution[feat], mixture_distribution[feat]) for feat in range(n_feats)] return np.mean(divs) return F.argmin_prevalence(loss, n_classes, method=self.search) class ReadMe(BaseQuantifier, WithConfidenceABC): """ ReadMe is a non-aggregative quantification system proposed by `Daniel Hopkins and Gary King, 2007. A method of automated nonparametric content analysis for social science. American Journal of Political Science, 54(1):229–247. `_. The idea is to estimate `Q(Y=i)` directly from: :math:`Q(X)=\\sum_{i=1} Q(X|Y=i) Q(Y=i)` via least-squares regression, i.e., without incurring the cost of computing posterior probabilities. However, this poses a very difficult representation in which the vector `Q(X)` and the matrix `Q(X|Y=i)` can be of very high dimensions. In order to render the problem tracktable, ReadMe performs bagging in the feature space. ReadMe also combines bagging with bootstrap in order to derive confidence intervals around point estimations. We use the same default parameters as in the official `R implementation `_. :param prob_model: str ('naive', or 'full'), selects the modality in which the probabilities `Q(X)` and `Q(X|Y)` are to be modelled. Options include "full", which corresponds to the original formulation of ReadMe, in which X is constrained to be a binary matrix (e.g., of term presence/absence) and in which `Q(X)` and `Q(X|Y)` are modelled, respectively, as matrices of `(2^K, 1)` and `(2^K, n)` values, where `K` is the number of columns in the data matrix (i.e., `bagging_range`), and `n` is the number of classes. Of course, this approach is computationally prohibited for large `K`, so the computation is restricted to data matrices with `K<=25` (although we recommend even smaller values of `K`). A much faster model is "naive", which considers the `Q(X)` and `Q(X|Y)` be multinomial distributions under the `bag-of-words` perspective. In this case, `bagging_range` can be set to much larger values. Default is "full" (i.e., original ReadMe behavior). :param bootstrap_trials: int, number of bootstrap trials (default 300) :param bagging_trials: int, number of bagging trials (default 300) :param bagging_range: int, number of features to keep for each bagging trial (default 15) :param confidence_level: float, a value in (0,1) reflecting the desired confidence level (default 0.95) :param region: str in 'intervals', 'ellipse', 'ellipse-clr'; indicates the preferred method for defining the confidence region (see :class:`WithConfidenceABC`) :param random_state: int or None, allows replicability (default None) :param verbose: bool, whether to display information during the process (default False) """ MAX_FEATURES_FOR_EMPIRICAL_ESTIMATION = 25 PROBABILISTIC_MODELS = ["naive", "full"] def __init__(self, prob_model="full", bootstrap_trials=300, bagging_trials=300, bagging_range=15, confidence_level=0.95, region='intervals', random_state=None, verbose=False): assert prob_model in ReadMe.PROBABILISTIC_MODELS, \ f'unknown {prob_model=}, valid ones are {ReadMe.PROBABILISTIC_MODELS=}' self.prob_model = prob_model self.bootstrap_trials = bootstrap_trials self.bagging_trials = bagging_trials self.bagging_range = bagging_range self.confidence_level = confidence_level self.region = region self.random_state = random_state self.verbose = verbose def fit(self, X, y): self._check_matrix(X) self.rng = np.random.default_rng(self.random_state) self.classes_ = np.unique(y) Xsize = X.shape[0] # Bootstrap loop self.Xboots, self.yboots = [], [] for _ in range(self.bootstrap_trials): idx = self.rng.choice(Xsize, size=Xsize, replace=True) self.Xboots.append(X[idx]) self.yboots.append(y[idx]) return self def predict_conf(self, X, confidence_level=0.95) -> (np.ndarray, ConfidenceRegionABC): self._check_matrix(X) n_features = X.shape[1] boots_prevalences = [] for Xboots, yboots in tqdm( zip(self.Xboots, self.yboots), desc='bootstrap predictions', total=self.bootstrap_trials, disable=not self.verbose ): bagging_estimates = [] for _ in range(self.bagging_trials): feat_idx = self.rng.choice(n_features, size=self.bagging_range, replace=False) Xboots_bagging = Xboots[:, feat_idx] X_boots_bagging = X[:, feat_idx] bagging_prev = self._quantify_iteration(Xboots_bagging, yboots, X_boots_bagging) bagging_estimates.append(bagging_prev) boots_prevalences.append(np.mean(bagging_estimates, axis=0)) conf = WithConfidenceABC.construct_region(boots_prevalences, confidence_level, method=self.region) prev_estim = conf.point_estimate() return prev_estim, conf def predict(self, X): prev_estim, _ = self.predict_conf(X) return prev_estim def _quantify_iteration(self, Xtr, ytr, Xte): """Single ReadMe estimate.""" PX_given_Y = np.asarray([self._compute_P(Xtr[ytr == c]) for i,c in enumerate(self.classes_)]) PX = self._compute_P(Xte) res = lsq_linear(A=PX_given_Y.T, b=PX, bounds=(0, 1)) pY = np.maximum(res.x, 0) return pY / pY.sum() def _check_matrix(self, X): """the "full" model requires estimating empirical distributions; due to the high computational cost, this function is only made available for binary matrices""" if self.prob_model == 'full' and not self._is_binary_matrix(X): raise ValueError('the empirical distribution can only be computed efficiently on binary matrices') def _is_binary_matrix(self, X): data = X.data if sparse.issparse(X) else X return np.all((data == 0) | (data == 1)) def _compute_P(self, X): if self.prob_model == 'naive': return self._multinomial_distribution(X) elif self.prob_model == 'full': return self._empirical_distribution(X) else: raise ValueError(f'unknown {self.prob_model}; valid ones are {ReadMe.PROBABILISTIC_MODELS=}') def _empirical_distribution(self, X): if X.shape[1] > self.MAX_FEATURES_FOR_EMPIRICAL_ESTIMATION: raise ValueError(f'the empirical distribution can only be computed efficiently for dimensions ' f'less or equal than {self.MAX_FEATURES_FOR_EMPIRICAL_ESTIMATION}') # we convert every binary row (e.g., 0 0 1 0 1) into the equivalent number (e.g., 5) K = X.shape[1] binary_powers = 1 << np.arange(K-1, -1, -1) # (2^K, ..., 32, 16, 8, 4, 2, 1) X_as_binary_numbers = X @ binary_powers # count occurrences and compute probs counts = np.bincount(X_as_binary_numbers, minlength=2 ** K).astype(float) probs = counts / counts.sum() return probs def _multinomial_distribution(self, X): PX = np.asarray(X.sum(axis=0)) PX = normalize(PX, norm='l1', axis=1) return PX.ravel() def _get_features_range(X): feat_ranges = [] ncols = X.shape[1] for col_idx in range(ncols): feature = X[:,col_idx] feat_ranges.append((np.min(feature), np.max(feature))) return feat_ranges #--------------------------------------------------------------- # aliases #--------------------------------------------------------------- DistributionMatchingX = DMx