Source code for mcfa.mcfa


""" Mixture of common factor analyzers. """

import json
import logging
import numpy as np
import warnings
from copy import deepcopy
from inspect import signature as inspect_signature
from scipy import linalg, spatial, stats
from scipy.special import logsumexp, gammaln, gamma
from sklearn import cluster
from sklearn.utils import check_random_state
from sklearn.utils.extmath import row_norms
from time import time
from tqdm import tqdm

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

# I think this is The Right Thing(tm) to do.
_INFLATE_PSI_AT_EACH_EM_STEP = True

[docs]class MCFA(object): r""" A mixture of common factor analyzers model. """ def __init__(self, n_components, n_latent_factors, covariance_regularization=0, max_iter=10000, tol=1e-5, init_components="kmeans++", init_factors="svd", verbose=1, random_seed=None, **kwargs): r""" A mixture of common factor analyzers model. :param n_components: The number of components (clusters) in the mixture. :param n_latent_factors: The number of common latent factors. :param covariance_regularization: [optional] An additive term to apply to the covariance matrices of factor scores in latent space, which aids in numerical stability. :param max_iter: [optional] The maximum number of expectation-maximization iterations. :param tol: [optional] Relative tolerance before declaring convergence. :param init_components: [optional] The iniitialisation method to use when assigning data points to components. Available options include: 'kmeans++', 'random', or a callable function that takes three arguments: the data, an initial estimate of the latent factors, and the number of components. This function should return three quantities: an array of the relative weights, the means in factor scores, the covariance matrices in factor scores. :param init_factors: [optional] The initialisation method to use for the latent factors. Available options include: 'svd', 'random', or a callable functiomn that takes the input data as a single argument and returns a matrix of latent factors. :param verbose: [optional] Show warning messages. :param random_seed: [optional] A seed for the random number generator. """ self.n_components = int(n_components) self.n_latent_factors = int(n_latent_factors) self.max_iter = int(max_iter) self.tol = float(tol) self.random_seed = check_random_state(random_seed) self.init_components = init_components self.init_factors = init_factors self.verbose = int(verbose) self.covariance_regularization = float(covariance_regularization) if self.covariance_regularization < 0: raise ValueError("covariance_regularization must be non-negative") if self.n_latent_factors < 1: raise ValueError("n_latent_factors must be a positive integer") if self.n_components < 1: raise ValueError("n_components must be a positive integer") if self.max_iter < 1: raise ValueError("number of iterations must be greater than one") if self.tol <= 0: raise ValueError("tol must be greater than zero") available_init_components = ("random", "kmeans++") if self.init_components not in available_init_components \ and not hasattr(self.init_components, "__call__"): raise ValueError(f"init_components must be one of {available_init_components} " f"or be a callable function") available_init_factors = ("random", "noise", "svd") if self.init_factors not in available_init_factors \ and not hasattr(self.init_factors, "__call__"): raise ValueError(f"init_factors must be one of {available_init_factors} " f"or be a callable function") self.log_likelihoods_ = [] return None
[docs] def serialize(self): r""" Serialize the object so that it can be saved to disk. """ result = dict() for result_attr in ("tau_", "theta_", "log_likelihood_", "n_iter_"): value = getattr(self, result_attr, None) if value is not None: result[result_attr] = value return json.dumps({ "class": self.__class__.__name__, "args": (self.n_components, self.n_latent_factors), "kwargs": dict(max_iter=self.max_iter, n_init=self.n_init, tol=self.tol, verbose=self.verbose, random_seed=self.random_seed), "result": result })
[docs] @classmethod def deserialize(cls, data): r""" De-serialize the data and return an object. :param data: Serialized data describing the object. :returns: A :class:`mcfa.MCFA` object. """ params = json.loads(data) klass = cls(*params["args"], **params["kwargs"]) for k, v in params["result"].items(): setattr(klass, k, v) return klass
def _check_data(self, X, warn_about_whitening=False): r""" Verify that the latent space has lower dimensionality than the data space. :param X: The data, which is expected to be an array with shape [n_samples, n_features]. :raises ValueError: If the data array has non-finite entries, or if there are more latent factors than there are dimensions. :returns: The data array, ensuring it is a 2D array. """ Y = np.atleast_2d(X).copy() #if not np.all(np.isfinite(Y)): # logger.warn("Non-finite data points will be treated as missing data at random.") N, D = Y.shape if D > N: logger.warning(f"There are more dimensions than data ({D} > {N})!") if D <= self.n_latent_factors: raise ValueError(f"there are more factors than dimensions "\ f"({self.n_latent_factors} >= {D})") # Check to see if the data are whitened. mu, sigma = (np.nanmean(Y, axis=0), np.nanstd(Y, axis=0)) if warn_about_whitening and \ not (np.allclose(mu, np.zeros(D)) or np.allclose(sigma, np.ones(D))): logger.warn("Supplied data do not appear to be whitened. "\ "Use mcfa.utils.whiten(X) to whiten the data.") if not np.all(np.isfinite(Y)): missing = ~np.isfinite(Y) Y[missing] = 0 else: missing = None # Pre-calculate X2, because we will use this at every EM step. self._check_precomputed_X2(Y, missing) return (Y, missing) def _check_precomputed_X2(self, X, missing=None, **kwargs): r""" Compute and store X^2 if it is not already calculated. If it is pre-computed, check a random entry of the matrix, and raise a `ValueError` exception if it does not meet expected tolerance. :param X: The data, which is expected to be an array with shape [n_samples, n_features]. """ try: self._X2 except AttributeError: self._X2 = np.square(X) else: # Check a single entry. if missing is None: i, j = (np.random.choice(X.shape[0]), np.random.choice(X.shape[1])) expected, actual = (np.square(X[i, j]), self._X2[i, j]) else: ii, jj = np.where(~missing) idx = np.random.choice(ii.size) i, j = ii[idx], jj[idx] expected, actual = (np.square(X[i, j]), self._X2[i, j]) # Update actual values for missing entries assuming that non-missing entries are OK self._X2[missing] = np.square(X[missing]) if not np.allclose(expected, actual, **kwargs): raise ValueError( f"pre-computed X^2 does not match actual X^2 at {i}, {j} "\ f"({expected} != {actual})") return self._X2 def _initial_parameters(self, X, missing=None, random_state=None): r""" Estimate the initial parameters of the model. :param X: The data, :math:`X`, which is expected to be an array of shape [n_samples, n_features]. :raises ValueError: If no valid initialisation point could be found. :returns: A five-length tuple containing the updated parameter estimates for the mixing weights :math:`\pi`, the common factor loads :math:`A`, the means of the components in latent space :math:`\xi`, the covariance matrices of components in latent space :math:`\omega`, and the variance in each dimension :math:`\psi`. """ previous_theta = getattr(self, "theta_", None) if previous_theta is not None: logger.warn("Running from previous estimates of theta") return previous_theta initial_factor_func = { "random": _initial_factor_loads_by_random, "noise": _initial_factor_loads_by_noise, "svd": _initial_factor_loads_by_svd }.get(self.init_factors, self.init_factors) A = initial_factor_func(X, self.n_latent_factors, random_state) initial_components_func = { "random": _initial_components_by_random, "kmeans++": _initial_components_by_kmeans_pp, }.get(self.init_components, self.init_components) # TODO: use random state. pi, xi, omega = initial_components_func(X, A, self.n_components, random_state) N, D = X.shape # TODO: Use np.ones (and potentially grow?) or use np.var (and potentially shrink?) #psi = _inflate_psi(np.var(X, axis=0), missing) #psi = _inflate_psi(np.ones(D), missing) psi = np.ones(D) return (pi, A, xi, omega, psi) @property def parameter_names(self): r""" Return the names of the parameters in this model. """ return tuple(inspect_signature(self.expectation).parameters.keys())[1:]
[docs] def number_of_parameters(self, D): r""" Return the number of model parameters :math:`Q` required to describe data of :math:`D` dimensions. .. math:: Q = (K - 1) + D + J(D + K) + \frac{1}{2}KJ(J + 1) - J^2 Where :math:`K` is the number of components, :math:`D` is the number of dimensions in the data, and :math:`J` is the number of latent factors. :param D: The dimensionality of the data (the number of features). :returns: The number of model parameters, :math:`Q`. """ J, K = self.n_latent_factors, self.n_components return int((K - 1) + D + J*(D + K) + (K*J*(J+1))/2 - J**2)
[docs] def expectation(self, X, pi, A, xi, omega, psi, **kwargs): r""" Compute the conditional expectation of the complete-data log-likelihood given the observed data :math:`X` and the given model parameters. :param X: The data, which is expected to be an array with shape [n_samples, n_features]. :param pi: The relative weights for the components in the mixture. This should have size `n_components` and the entries should sum to one. :param A: The common factor loads between mixture components. This should have shape [n_features, n_latent_factors]. :param xi: The mean factors for the components in the mixture. This should have shape [n_latent_factors, n_components]. :param omega: The covariance matrix of the mixture components in latent space. This array should have shape [n_latent_factors, n_latent_factors, n_components]. :param psi: The variance in each dimension. This should have size [n_features]. :raises scipy.linalg.LinAlgError: If the covariance matrix of any mixture component in latent space is ill-conditioned or singular. :returns: A two-length tuple containing the sum of the log-likelihood for the data given the model, and the responsibility matrix :math:`\tau` giving the partial associations between each data point and each component in the mixture. """ kw = dict(verbose=self.verbose, covariance_regularization=self.covariance_regularization) ll, tau = _expectation(X, pi, A, xi, omega, psi, **{**kw, **kwargs}) self.log_likelihoods_.append(ll) return (ll, tau)
[docs] def maximization(self, X, tau, pi, A, xi, omega, psi, **kwargs): r""" Compute the updated estimates of the model parameters given the data, the responsibility matrix :math:`\tau`, and the current estimates of the model parameters. :param X: The data, which is expected to be an array with shape [n_samples, n_features]. :param tau: The responsibility matrix, which is expected to have shape [n_samples, n_components]. The sum of each row is expected to equal one, and the value in the i-th row (sample) of the j-th column (component) indicates the partial responsibility (between zero and one) that the j-th component has for the i-th sample. :param pi: The relative weights for the components in the mixture. This should have size `n_components` and the entries should sum to one. :param A: The common factor loads between mixture components. This should have shape [n_features, n_latent_factors]. :param xi: The mean factors for the components in the mixture. This should have shape [n_latent_factors, n_components]. :param omega: The covariance matrix of the mixture components in latent space. This array should have shape [n_latent_factors, n_latent_factors, n_components]. :param psi: The variance in each dimension. This should have size [n_features]. :returns: A five-length tuple containing the updated parameter estimates for the mixing weights :math:`\pi`, the common factor loads :math:`A`, the means of the components in latent space :math:`\xi`, the covariance matrices of components in latent space :math:`\omega`, and the variance in each dimension :math:`\psi`. """ kw = dict(X2=self._check_precomputed_X2(X, **kwargs), covariance_regularization=self.covariance_regularization) return _maximization(X, tau, pi, A, xi, omega, psi, **{**kw, **kwargs})
def _check_convergence(self, previous, current): r""" Check whether the expectation-maximization algorithm has converged based on the previous cost (e.g., log-likelihood or message length), and the current cost. :param previous: The cost of the previous estimate of the model parameters. :param current: The cost of the current estimate of the model parameters. :returns: A three-length tuple containing: a boolean flag indicating whether convergence has been achieved, the current cost, and the ratio of the current cost relative to the previous cost. """ if not np.isfinite(current): logger.warn("Non-finite log likelihood.") if previous > current: logger.warn(f"Log likelihood *decreased* by {previous-current:.2e}") ratio = abs((current - previous)/current) converged = (self.tol >= ratio) and previous < current if converged: logger.debug(f"Converged because ({self.tol:.1e} >= {ratio:.1e}) and ({previous} < {current}") logger.debug(f"Ratio: {ratio:.1e}, delta: {current-previous:.1e}") return (converged, current, ratio)
[docs] def fit(self, X, init_params=None, **kwargs): r""" Fit the model to the data, :math:`Y`. :param X: The data, :math:`X`, which is expected to be an array of shape [n_samples, n_features]. :param init_params: [optional] A dictionary of initial values to run expectation-maximization from. :returns: The fitted model. """ Y, missing = self._check_data(X) # Allow for multiple initialisations. n_inits = kwargs.get("n_inits", 1) initial_lls = [] initial_params = [] # Generate seeds. for seed in np.random.randint(2**32, size=n_inits): theta = prev_theta = self._initial_parameters(Y, missing, seed) \ if init_params is None else deepcopy(init_params) ll, tau = self.expectation(Y, *theta) initial_lls.append(ll) initial_params.append([theta, tau]) logger.debug(f"Random seed: {seed} and initial log likelihood {ll:.3e}") # Get best. idx = np.argmax(initial_lls) prev_ll = initial_lls[idx] theta, tau = initial_params[idx] logger.debug(f"Peak-to-peak of initial log likelihoods: {np.ptp(initial_lls)}") # TODO: start n_iter counter from previous value if we are starting from # a previously optimised value. # Run E-M. tqdm_kwds = dict(desc="E-M", total=self.max_iter) for n_iter in tqdm(range(1, 1 + self.max_iter), **tqdm_kwds): try: theta = self.maximization(Y, tau, *theta, missing=missing) ll, tau = self.expectation(Y, *theta) except: logger.exception(f"Exception occurred during E-M algorithm:") logger.warning(f"Solution is unlikely to be converged.") theta = prev_theta if kwargs.get("debug", False): raise break converged, prev_ll, ratio = self._check_convergence(prev_ll, ll) prev_theta = theta if converged: break if missing is not None: # Update missing data values. U, UC, Umean, tau = _factor_scores(Y, *theta, tau=tau) Y_approx = (theta[1] @ Umean.T).T # Now update the Y values with our expectations given the model, and re-calculate # the log-likelihood as a sanity check. Y[missing] = Y_approx[missing] else: if self.verbose >= 0: logger.warning(f"Convergence not achieved after at least "\ f"{self.max_iter} iterations ({ratio} > {self.tol})."\ f" Consider increasing the maximum number of iterations.") try: ll except NameError: raise ValueError("bad initialisation") pi, A, xi, omega, psi = theta # Make A.T @ A = I try: AL = linalg.cholesky(A.T @ A) except: logger.exception("Exception trying to ensure valid rotation matrix. " "Result may not be consistent!") else: A = A @ linalg.solve(AL, np.eye(self.n_latent_factors)) xi = AL @ xi for i in range(self.n_components): omega[:, :, i] = AL @ omega[:, :, i] @ AL.T A = _post_check_factor_loads(A) if not _INFLATE_PSI_AT_EACH_EM_STEP: psi = _inflate_psi(psi, missing) self.tau_ = tau self.theta_ = [pi, A, xi, omega, psi] self.log_likelihood_ = ll self.n_iter_ = n_iter return self
[docs] def factor_scores(self, X): r""" Estimate the posterior factor scores given the model parameters. :param X: The data, :math:`X`, which is expected to be an array of shape [n_samples, n_features]. """ return _factor_scores(X, *self.theta_)
[docs] def bic(self, X, theta=None, log_likelihood=None): r""" Estimate the Bayesian Information Criterion given the model and the data. :param X: The data, :math:`X`, which is expected to be an array of shape [n_samples, n_features]. :param theta: [optional] The model parameters :math:`\theta`. If None is given then the model parameters from `self.theta_` will be used. :returns: The Bayesian Information Criterion (BIC) for the model and the data. A smaller BIC value is often used as a statistic to select a single model from a class of models. """ N, D = X.shape if log_likelihood is None: if theta is None: theta = self.theta_ log_likelihood, tau = self.expectation(X, *theta) return np.log(N) * self.number_of_parameters(D) - 2 * log_likelihood
[docs] def pseudo_bic(self, X, gamma=0.1, omega=1, theta=None): r""" Estimate the pseudo Bayesian Information Criterion given the model and the data as per Gao and Carroll (2017): .. math: \textrm{pseudo - BIC} = 6(1 + \gamma)\omega\log{N}Q - 2\log{mathcal{L}} :param X: The data, :math:`X`, which is expected to be an array of shape [n_samples, n_features]. :param theta: [optional] The model parameters :math:`\theta`. If None is given then the model parameters from `self.theta_` will be used. :returns: The Bayesian Information Criterion (BIC) for the model and the data. A smaller BIC value is often used as a statistic to select a single model from a class of models. """ if not gamma > 0: raise ValueError("gamma should be greater than zero") if not omega >= 1: raise ValueError("omega should be at least unity or higher") if theta is None: theta = self.theta_ N, D = np.atleast_2d(X).shape log_likelihood, tau = self.expectation(X, *theta) Q = self.number_of_parameters(D) return 6 * (1 + gamma) * omega * np.log(N) * Q - 2 * log_likelihood
[docs] def message_length(self, X, theta=None, log_likelihood=None): r""" Estimate the explanation length given the model and the data. :param X: The data, :math:`X`, which is expected to be an array of shape [n_samples, n_features]. :param theta: [optional] The model parameters :math:`\theta`. If None is given then the model parameters from `self.theta_` will be used. """ if theta is None: theta = self.theta_ if log_likelihood is None: log_likelihood, tau = self.expectation(X, *theta) N, D = np.atleast_2d(X).shape # For just MLF: pi, A, xi, omega, psi = theta J, K = self.n_latent_factors, self.n_components # Let's do this by parts. I_parts = dict() # Mixing weights. # I(w) = -log\gamma(K) + 0.5 * (K-1)\log{NN} -0.5\sum_{K}\log\weight_k slog_pi = np.sum(np.log(pi)) I_parts["pi"] = -gammaln(K) + 0.5 * (K - 1) * np.log(N) - 0.5 * slog_pi # Means and covariance matrices of various components. # I(epsilon, omega) _, logdet_omega = np.linalg.slogdet(omega.T) I_parts["xi,omega"] = -(J + 3/2) * np.sum(logdet_omega) \ + 0.25 * J * (J + 3) * (slog_pi + K * np.log(N)) \ - 0.5 * J * K * np.log(2) # Factor loads. # I(A) = -log(p(A)) + 0.5 * log|F(A)| # TODO: Don't know 0.5 * log|F(A)| yet, so I(A) here = -log(p(A)) # log(p(A)) = - S = np.atleast_2d(np.cov(A.T)) _, logdet_S = np.linalg.slogdet(S) S_inv = np.linalg.solve(S, np.eye(J)) M = A.T @ A _, logdet_M = np.linalg.slogdet(M) I_parts["A"] = 0.5 * np.trace(S_inv @ M) \ - 0.5 * (D - J - 1) * logdet_M \ + 0.5 * D * J * np.log(2) \ + 0.5 * D * logdet_S \ + gammaln(D/2) # Encode the number of components. # Assume p(K) \propto 2^{-K} # I(K) = -log(p(K)) = K * log(2) I_parts["K"] = K * np.log(2) # Encode the number of factors. # Assume p(J) \propto 2^{-J} # I(J) = -log(p(J)) = J * log(2) I_parts["J"] = J * np.log(2) I_parts["nll"] = -np.sum(log_likelihood) # Constant terms: Q = self.number_of_parameters(D) I_parts["lattice"] = -0.5 * Q * np.log(2*np.pi) \ + 0.5 * np.log(Q * np.pi) \ - np.euler_gamma return np.sum(np.array(list(I_parts.values())))
[docs] def sample(self, n_samples=1, theta=None): r""" Generate random samples from the fitted model. :param n_samples: [optional] Number of samples to generate. Defaults to 1. # TODO: return docs """ if theta is None: theta = self.theta_ pi, A, xi, omega, psi = theta # Draw which component it is from. taus = np.random.choice(self.n_components, size=n_samples, p=pi) # Draw from the latent space and project to real space. N, D = (n_samples, psi.size) X = np.empty((N, D)) for k in range(self.n_components): match = (taus == k) scores = np.random.multivariate_normal(xi.T[k], omega.T[k], size=sum(taus == k)) # Project to real space. X[match] = (A @ scores.T).T # Add noise. X += np.random.multivariate_normal(np.zeros(D), np.diag(psi), N) return X
[docs] def rotate(self, R, X=None, ensure_valid_rotation=True, atol=1e-3, rtol=1e-5): r""" Rotate the factor loads and factor scores by a valid rotation matrix. :param R: A `J` times `J` rotation matrix, where `J` is the number of latent factors. :param X: [optional] The data, which is expected to be an array with shape [n_samples, n_features]. If given, the log-likelihood will be evaluated before and after rotation. A warning will be raised if the log-likelihood changes by more than the convergence tolerance. :param ensure_valid_rotation: [optional] If the rotation matrix does not follow R @ R.T = I, then the nearest rotation matrix with this property will be used. :param atol: [optional] The absolute tolerance acceptable for individual entries in the matrix I - R @ R.T. Default is 1e-3. :param rtol: The relative tolerance acceptable for individual entries in the matrix I - R @ R.T. Default is 1e-5. :returns: The actual rotation matrix applied. """ # Check that it is a rotation matrix. R = np.atleast_2d(R) J, J_ = R.shape if J != J_: raise ValueError("rotation matrix is not square") if not np.allclose(R @ R.T, np.eye(J), atol=atol, rtol=rtol): # Do our own rotation. L = linalg.cholesky(R.T @ R) R = R @ linalg.solve(L, np.eye(self.n_latent_factors)) if not np.allclose(R @ R.T, np.eye(J), atol=atol, rtol=rtol): raise ValueError("R is not a valid rotation matrix") if X is not None: ll, tau = self.expectation(X, *self.theta_) pi, A, xi, omega, psi = self.theta_ # A covariance matrix can be decomposed as: # \Sigma = Q @ \Lambda @ Q' # Where Q is a valid rotation matrix and \Lambda acts like a scaling # matrix. # Thus a rotated covariance matrix is: # \Sigma_rot = R.T @ omega_ @ R self.theta_ = (pi, A @ R, (xi.T @ R).T, (R.T @ omega.T @ R).T, psi) if X is not None: ll_r, tau_r = self.expectation(X, *self.theta_) if np.abs(ll - ll_r) > self.tol: logger.warning(f"Log-likelihood changed after rotation: "\ f"{ll - ll_r} (>{self.tol})") return R
def _initial_factor_loads_by_random(X, n_latent_factors, random_state=None): N, D = X.shape A = stats.ortho_group.rvs(D, random_state=random_state)[:, :n_latent_factors] AL = linalg.cholesky(A.T @ A) A = A @ linalg.solve(AL, np.eye(n_latent_factors)) return A def _initial_factor_loads_by_noise(X, n_latent_factors, random_state=None, scale=1e-2): # TODO: use random state. N, D = X.shape return np.random.normal(0, scale, size=(D, n_latent_factors)) def _initial_factor_loads_by_svd(X, n_latent_factors, random_state=None, n_svd_max=1000): # TODO: use random state N, D = X.shape n_svd_max = N if n_svd_max < 0 or n_svd_max > N else int(n_svd_max) idx = np.random.choice(N, n_svd_max, replace=False) U, S, V = linalg.svd(X[idx].T/np.sqrt(N - 1)) return U[:, :n_latent_factors] def _initial_assignments_by_random(X, A, n_components, random_state=None): N, D = X.shape return np.random.choice(n_components, N) def _initial_assignments_by_kmeans_pp(X, A, n_components, random_state=None): r""" Estimate the initial assignments of each sample to each component in the mixture. :param X: The data, which is expected to be an array with shape [n_samples, n_features]. :param n_components: The number of components (clusters) in the mixture. :returns: An array of shape [1, n_samples] with initial assignments, where each integer entry in the matrix indicates which component the sample is to be initialised to. """ random_state = check_random_state(random_state) Y = X @ A # run k-means++ in the latent space squared_norms = row_norms(Y, squared=True) means = cluster.k_means_._k_init(Y, n_components, random_state=random_state, x_squared_norms=squared_norms) return np.argmin(spatial.distance.cdist(means, Y), axis=0) def _initial_components(X, A, n_components, assignments): N, D = X.shape D, J = A.shape n_components = len(set(assignments)) xi = np.empty((J, n_components)) omega = np.empty((J, J, n_components)) pi = np.empty(n_components) for i in range(n_components): match = (assignments == i) pi[i] = float(sum(match)) / N xs = X[match] @ A xi[:, i] = np.mean(xs, axis=0) omega[:, :, i] = np.cov(xs.T) return (pi, xi, omega) def _initial_components_by_random(X, A, n_components, random_state): assignments = _initial_assignments_by_random(X, A, n_components, random_state) return _initial_components(X, A, n_components, assignments) def _initial_components_by_kmeans_pp(X, A, n_components, random_state): assignments = _initial_assignments_by_kmeans_pp(X, A, n_components, random_state) return _initial_components(X, A, n_components, assignments) def _expectation(X, pi, A, xi, omega, psi, verbose=1, covariance_regularization=0, full_output=False): r""" Compute the conditional expectation of the complete-data log-likelihood given the observed data :math:`X` and the given model parameters. :param X: The data, which is expected to be an array with shape [n_samples, n_features]. :param pi: The relative weights for the components in the mixture. This should have size `n_components` and the entries should sum to one. :param A: The common factor loads between mixture components. This should have shape [n_features, n_latent_factors]. :param xi: The mean factors for the components in the mixture. This should have shape [n_latent_factors, n_components]. :param omega: The covariance matrix of the mixture components in latent space. This array should have shape [n_latent_factors, n_latent_factors, n_components]. :param psi: The variance in each dimension. This should have size [n_features]. :param verbose: [optional] The verbosity. :raises scipy.linalg.LinAlgError: If the covariance matrix of any mixture component in latent space is ill-conditioned or singular. :returns: A two-length tuple containing the sum of the log-likelihood for the data given the model, and the responsibility matrix :math:`\tau` giving the partial associations between each data point and each component in the mixture. """ N, D = X.shape K = pi.size I_D = np.eye(D) effective_psi = psi + covariance_regularization I_psi = I_D * effective_psi U = (np.diag(1.0/psi) @ A).T log_prob = np.zeros((N, K)) log_prob_constants = -0.5 * (np.sum(np.log(effective_psi)) + D * np.log(2*np.pi)) for i, (xi_, omega_) in enumerate(zip(xi.T, omega.T)): mu = A @ xi_ A_omega_ = A @ omega_ cov = A_omega_ @ A.T + I_psi precision = _compute_precision_cholesky_full(cov) dist = np.sum( np.square(np.dot(X, precision) - np.dot(mu, precision)), axis=1) # Use matrix determinant lemma: # det(A + U @V.T) = det(I + V.T @ A^-1 @ U) * det(A) # here A = omega^{-1}; U = (\psi^-1 @ A).T; V = A.T # and making use of det(A) = 1.0/det(A^-1) with warnings.catch_warnings(): if 1 > verbose: warnings.simplefilter("ignore") # log(det(omega)) + log(det(I + A @ omega @ U)/det(omega)) \ # = log(det(I + A @ omega @ U)) _, logdetD = np.linalg.slogdet(I_D + A @ omega_ @ U) log_prob[:, i] = -0.5 * dist - 0.5 * logdetD + log_prob_constants weighted_log_prob = log_prob + np.log(pi) log_prob = logsumexp(weighted_log_prob, axis=1) with np.errstate(under="ignore"): log_tau = weighted_log_prob - log_prob[:, np.newaxis] if not full_output: return (np.sum(log_prob), np.exp(log_tau)) else: return (np.sum(log_prob), np.exp(log_tau)) def _maximization(X, tau, pi, A, xi, omega, psi, X2=None, covariance_regularization=0, missing=None, **kwargs): r""" Compute the updated estimates of the model parameters given the data, the responsibility matrix :math:`\tau`, and the current estimates of the model parameters. :param X: The data, which is expected to be an array with shape [n_samples, n_features]. :param tau: The responsibility matrix, which is expected to have shape [n_samples, n_components]. The sum of each row is expected to equal one, and the value in the i-th row (sample) of the j-th column (component) indicates the partial responsibility (between zero and one) that the j-th component has for the i-th sample. :param pi: The relative weights for the components in the mixture. This should have size `n_components` and the entries should sum to one. :param A: The common factor loads between mixture components. This should have shape [n_features, n_latent_factors]. :param xi: The mean factors for the components in the mixture. This should have shape [n_latent_factors, n_components]. :param omega: The covariance matrix of the mixture components in latent space. This array should have shape [n_latent_factors, n_latent_factors, n_components]. :param psi: The variance in each dimension. This should have size [n_features]. :param X2: [optional] The square of X, :math:`X^2`. If `None` is given then this will be computed at each maximization step. :param covariance_regularization: [optional] An additive term to apply to the covariance matrices of factor scores in latent space, which aids in numerical stability. :returns: A five-length tuple containing the updated parameter estimates for the mixing weights :math:`\pi`, the common factor loads :math:`A`, the means of the components in latent space :math:`\xi`, the covariance matrices of components in latent space :math:`\omega`, and the variance in each dimension :math:`\psi`. """ if X2 is None: X2 = np.square(X) N, D = X.shape D, J = A.shape inv_D = np.diag(1.0/psi) A1 = np.zeros((D, J)) A2 = np.zeros((J, J)) Di = np.zeros(D) inv_D_A = inv_D @ A ti = np.sum(tau, axis=0).astype(float) I_J = np.eye(J) for i, tau_ in enumerate(tau[np.newaxis].T): W = linalg.solve(omega[:, :, i], I_J) C = linalg.solve(W + A.T @ inv_D_A, I_J) gamma = (inv_D - inv_D_A @ C @ inv_D_A.T) @ A @ omega[:, :, i] xi_ = np.copy(xi[:, [i]]) Y_Axi_i = X.T - A @ xi_ tY_Axi_i = Y_Axi_i * tau_.T xi[:, i] += gamma.T @ (np.sum(tY_Axi_i, axis=1) / ti[i]) diff = (xi_ - xi[:, [i]]) omega[:, :, i] = (I_J - gamma.T @ A) @ omega[:, :, i] \ + gamma.T @ Y_Axi_i @ tY_Axi_i.T @ gamma / ti[i] \ - diff @ diff.T \ + covariance_regularization * I_J A1 += np.atleast_2d(np.sum(X * tau_, axis=0)).T @ xi_.T \ + X.T @ tY_Axi_i.T @ gamma A2 += (omega[:, :, i] + xi[:, [i]] @ xi[:, [i]].T) * ti[i] A = A1 @ linalg.solve(A2, I_J) A = _post_check_factor_loads(A) Di = np.sum(X2.T @ tau, axis=1) psi = (Di - np.sum((A @ A2) * A, axis=1)) / N if _INFLATE_PSI_AT_EACH_EM_STEP: # Inflate psi according to Rubin's rule, given the missing data mask. psi = _inflate_psi(psi, missing) pi = ti / N if np.any(psi < covariance_regularization): #logger.warn(f"Setting minimum psi as {covariance_regularization:.1e} for stability") psi = np.clip(psi, covariance_regularization, np.inf) return (pi, A, xi, omega, psi) def _factor_scores(X, pi, A, xi, omega, psi, tau=None): r""" Estimate the factor scores for each data point, given the model. :param X: The data, which is expected to be an array with shape [n_samples, n_features]. :param pi: The relative weights for the components in the mixture. This should have size `n_components` and the entries should sum to one. :param A: The common factor loads between mixture components. This should have shape [n_features, n_latent_factors]. :param xi: The mean factors for the components in the mixture. This should have shape [n_latent_factors, n_components]. :param omega: The covariance matrix of the mixture components in latent space. This array should have shape [n_latent_factors, n_latent_factors, n_components]. :param psi: The variance in each dimension. This should have size [n_features]. :param tau: [optional] The responsibility matrix, which is expected to have shape [n_samples, n_components]. The sum of each row is expected to equal one, and the value in the i-th row (sample) of the j-th column (component) indicates the partial responsibility (between zero and one) that the j-th component has for the i-th sample. If this is given then the shape must match the given data points. If `None` is given then this will be calculated. # TODO: returns """ N, D = X.shape J, K = xi.shape U = np.zeros((N, J, K)) gamma = np.zeros((D, J, K)) inv_D = np.diag(1.0/psi) I = np.eye(J) for k in range(K): C = linalg.solve(linalg.solve(omega[:, :, k], I) \ + A.T @ inv_D @ A, I) gamma[:, :, k] = (inv_D - inv_D @ A @ C @ A.T @ inv_D) \ @ A @ omega[:, :, k] U[:, :, k] = np.repeat(xi[:, [k]], N).reshape((J, N)).T \ + (X - (A @ xi[:, [k]]).T) @ gamma[:, :, k] ll, tau = _expectation(X, pi, A, xi, omega, psi) cluster = np.argmax(tau, axis=1) UC = np.zeros((N, J)) Umean = np.zeros((N, J)) for i in range(N): UC[i] = U[i, :, cluster[i]] Umean[i] = tau[i] @ U[i].T return (U, UC, Umean, tau) def _compute_precision_cholesky_full(cov): r""" Compute the Cholesky decomposition of the precision of the given covariance matrix, which is expected to be a square matrix with non-zero off-diagonal terms. :param cov: The given covariance matrix. :raises scipy.linalg.LinAlgError: If the Cholesky decomposition failed. :returns: The Cholesky decomposition of the precision of the covariance matrix. """ try: cholesky_cov = linalg.cholesky(cov, lower=True) except linalg.LinAlgError: raise linalg.LinAlgError("failed to do Cholesky decomposition") # Return the precision matrix. D, _ = cov.shape return linalg.solve_triangular(cholesky_cov, np.eye(D), lower=True).T def _inflate_psi(psi, missing): r""" Inflate \psi by the fraction of missing data points according to Rubin's rule. """ return psi if missing is None \ else psi / (1 - np.sum(missing, axis=0) / missing.shape[0]) def _post_check_factor_loads(A): D, J = A.shape #A[np.triu_indices(J, 1)] = 0 #A[np.diag_indices(J)] = np.abs(A[np.diag_indices(J)]) return A