Source code for pmlearn.mixture.dirichlet_process

"""Dirichlet Process Mixture Model. """

# Authors: Daniel Emaasit <daniel.emaasit@gmail.com>
#
# License: BSD 3 clause

import numpy as np
import pymc3 as pm
import theano
import theano.tensor as tt

from ..exceptions import NotFittedError
from ..base import BayesianModel, BayesianDensityMixin
from .util import logp_gmix


[docs]class DirichletProcessMixture(BayesianModel, BayesianDensityMixin): """ Custom Dirichlet Process Mixture Model built using PyMC3. """ def __init__(self): super(DirichletProcessMixture, self).__init__() self.num_truncate = None
[docs] def create_model(self): """ Creates and returns the PyMC3 model. Note: The size of the shared variables must match the size of the training data. Otherwise, setting the shared variables later will raise an error. See http://docs.pymc.io/advanced_theano.html The DensityDist class is used as the likelihood term. The second argument, logp_gmix(mus, pi, np.eye(D)), is a python function which recieves observations (denoted by 'value') and returns the tensor representation of the log-likelihood. Returns ---------- the PyMC3 model """ model_input = theano.shared(np.zeros([self.num_training_samples, self.num_pred])) # model_output = theano.shared(np.zeros(self.num_training_samples)) # model_truncate = theano.shared(np.zeros(self.num_training_samples, # dtype='int')) self.shared_vars = { 'model_input': model_input # , # 'model_output': model_output, # 'model_truncate': model_truncate } # Log likelihood of normal distribution # def logp_normal(mu, tau, value): # # log probability of individual samples # k = tau.shape[0] # # def delta(mu): # return value - mu # # delta = lambda mu: value - mu # return (-1 / 2.) * (k * T.log(2 * np.pi) + T.log(1./det(tau)) + # (delta(mu).dot(tau) * delta( # mu)).sum(axis=1)) # Log likelihood of Gaussian mixture distribution # def logp_gmix(mus, pi, tau): # def logp_(value): # logps = [T.log(pi[i]) + logp_normal(mu, tau, value) # for i, mu in enumerate(mus)] # # return T.sum( # logsumexp(T.stacklists(logps)[:, :self.num_training_samples], # axis=0)) # # return logp_ def stick_breaking(v): portion_remaining = tt.concatenate( [[1], tt.extra_ops.cumprod(1 - v)[:-1]]) return v * portion_remaining model = pm.Model() with model: K = self.num_truncate D = self.num_pred alpha = pm.Gamma('alpha', 1.0, 1.0) v = pm.Beta('v', 1, alpha, shape=K) pi_ = stick_breaking(v) pi = pm.Deterministic('pi', pi_/pi_.sum()) means = tt.stack([pm.Uniform('cluster_center_{}'.format(k), lower=0., upper=10., shape=D) for k in range(K)]) lower = tt.stack([pm.LKJCholeskyCov( 'cluster_variance_{}'.format(k), n=D, eta=2., sd_dist=pm.HalfNormal.dist(sd=1.)) for k in range(K)]) chol = tt.stack([pm.expand_packed_triangular( D, lower[k]) for k in range(K)]) component_dists = [pm.MvNormal( 'component_dist_%d' % k, mu=means[k], chol=chol[k], shape=D) for k in range(K)] # rand = [pm.MvNormal( # 'rand_{}'.format(k), # mu=means[k], chol=Chol[k], shape=D) for k in range(K)] rand = pm.Normal.dist(0, 1).random X = pm.DensityDist( 'X', logp_gmix( mus=component_dists, pi=pi, tau=np.eye(D), num_training_samples=model_input.get_value().shape[0]), observed=model_input, random=rand) return model
[docs] def predict_proba(self, X, return_std=False): """ Predicts probabilities of new data with a trained Dirichlet Process Mixture Model Parameters ---------- X : numpy array, shape [n_samples, n_features] cats : numpy array, shape [n_samples, ] return_std : Boolean flag Boolean flag of whether to return standard deviations with mean probabilities. Defaults to False. """ if self.trace is None: raise NotFittedError('Run fit on the model before predict.') # num_samples = X.shape[0] if self.cached_model is None: self.cached_model = self.create_model() self._set_shared_vars({'model_input': X}) _vars = self.cached_model.free_RVs[8:11] ppc = pm.sample_ppc(self.trace, model=self.cached_model, vars=_vars, samples=2000, size=len(X)) return(ppc)
# if return_std: # return ppc['X'].mean(axis=0), ppc['X'].std(axis=0) # else: # return ppc['X'].mean(axis=0)
[docs] def save(self, file_prefix): params = { 'inference_type': self.inference_type, # 'num_cats': self.num_cats, 'num_pred': self.num_pred, 'num_training_samples': self.num_training_samples } super(DirichletProcessMixture, self).save(file_prefix, params)
[docs] def load(self, file_prefix): params = super(DirichletProcessMixture, self).load( file_prefix, load_custom_params=True) self.inference_type = params['inference_type'] # self.num_cats = params['num_cats'] self.num_pred = params['num_pred'] self.num_training_samples = params['num_training_samples']