diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index 9f6129d45e1..76ed533fb89 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -78,6 +78,7 @@ jobs: pymc/tests/test_transforms.py pymc/tests/test_smc.py pymc/tests/test_bart.py + pymc/tests/test_mixture.py - | pymc/tests/test_parallel_sampling.py diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md index 0094cc29b74..ce265b5d554 100644 --- a/RELEASE-NOTES.md +++ b/RELEASE-NOTES.md @@ -13,7 +13,7 @@ Instead update the vNext section until 4.0.0 is out. ### Not-yet working features We plan to get these working again, but at this point their inner workings have not been refactored. - Timeseries distributions (see [#4642](https://github.com/pymc-devs/pymc/issues/4642)) -- Mixture distributions (see [#4781](https://github.com/pymc-devs/pymc/issues/4781)) +- Nested Mixture distributions (see [#5533](https://github.com/pymc-devs/pymc/issues/5533)) - Elliptical slice sampling (see [#5137](https://github.com/pymc-devs/pymc/issues/5137)) - `BaseStochasticGradient` (see [#5138](https://github.com/pymc-devs/pymc/issues/5138)) - `pm.sample_posterior_predictive_w` (see [#4807](https://github.com/pymc-devs/pymc/issues/4807)) @@ -72,6 +72,7 @@ All of the above apply to: - In the gp.utils file, the `kmeans_inducing_points` function now passes through `kmeans_kwargs` to scipy's k-means function. - The function `replace_with_values` function has been added to `gp.utils`. - `MarginalSparse` has been renamed `MarginalApprox`. + - Removed `MixtureSameFamily`. `Mixture` is now capable of handling batched multivariate components (see [#5438](https://github.com/pymc-devs/pymc/pull/5438)). - ... ### Expected breaks diff --git a/docs/source/api/distributions/mixture.rst b/docs/source/api/distributions/mixture.rst index 50a34ff8b21..747e9c26213 100644 --- a/docs/source/api/distributions/mixture.rst +++ b/docs/source/api/distributions/mixture.rst @@ -8,4 +8,3 @@ Mixture Mixture NormalMixture - MixtureSameFamily diff --git a/pymc/distributions/__init__.py b/pymc/distributions/__init__.py index 8134addf853..ca7135e8964 100644 --- a/pymc/distributions/__init__.py +++ b/pymc/distributions/__init__.py @@ -83,7 +83,7 @@ NoDistribution, SymbolicDistribution, ) -from pymc.distributions.mixture import Mixture, MixtureSameFamily, NormalMixture +from pymc.distributions.mixture import Mixture, NormalMixture from pymc.distributions.multivariate import ( CAR, Dirichlet, @@ -180,7 +180,6 @@ "SkewNormal", "Mixture", "NormalMixture", - "MixtureSameFamily", "Triangular", "DiscreteWeibull", "Gumbel", diff --git a/pymc/distributions/distribution.py b/pymc/distributions/distribution.py index 31a9cfb9a88..daa649fd2ee 100644 --- a/pymc/distributions/distribution.py +++ b/pymc/distributions/distribution.py @@ -500,6 +500,7 @@ def __new__( rv_out = cls.change_size( rv=rv_out, new_size=resize_shape, + expand=True, ) rv_out = model.register_rv( diff --git a/pymc/distributions/mixture.py b/pymc/distributions/mixture.py index 6105bb726ca..0e06fef5f8d 100644 --- a/pymc/distributions/mixture.py +++ b/pymc/distributions/mixture.py @@ -11,21 +11,33 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -from collections.abc import Iterable +import warnings import aesara import aesara.tensor as at import numpy as np -from pymc.aesaraf import _conversion_map, take_along_axis +from aeppl.abstract import MeasurableVariable, _get_measurable_outputs +from aeppl.logprob import _logprob +from aesara.compile.builders import OpFromGraph +from aesara.tensor import TensorVariable +from aesara.tensor.random.op import RandomVariable + +from pymc.aesaraf import change_rv_size from pymc.distributions.continuous import Normal, get_tau_sigma from pymc.distributions.dist_math import check_parameters -from pymc.distributions.distribution import Discrete, Distribution +from pymc.distributions.distribution import ( + Discrete, + Distribution, + SymbolicDistribution, + _get_moment, + get_moment, +) +from pymc.distributions.logprob import logp from pymc.distributions.shape_utils import to_tuple -from pymc.math import logsumexp +from pymc.util import check_dist_not_registered -__all__ = ["Mixture", "NormalMixture", "MixtureSameFamily"] +__all__ = ["Mixture", "NormalMixture"] def all_discrete(comp_dists): @@ -38,7 +50,14 @@ def all_discrete(comp_dists): return all(isinstance(comp_dist, Discrete) for comp_dist in comp_dists) -class Mixture(Distribution): +class MarginalMixtureRV(OpFromGraph): + """A placeholder used to specify a log-likelihood for a mixture sub-graph.""" + + +MeasurableVariable.register(MarginalMixtureRV) + + +class Mixture(SymbolicDistribution): R""" Mixture log-likelihood @@ -53,516 +72,357 @@ class Mixture(Distribution): Parameters ---------- - w: array of floats + w : tensor_like of float w >= 0 and w <= 1 the mixture weights - comp_dists: multidimensional PyMC distribution (e.g. `pm.Poisson.dist(...)`) - or iterable of PyMC distributions the component distributions + comp_dists : iterable of PyMC distributions or single batched distribution + Distributions should be created via the `.dist()` API. If single distribution is + passed, the last size dimension (not shape) determines the number of mixture + components (e.g. `pm.Poisson.dist(..., size=components)`) :math:`f_1, \ldots, f_n` Examples -------- .. code-block:: python - # 2-Mixture Poisson distribution + # Mixture of 2 Poisson variables with pm.Model() as model: - lam = pm.Exponential('lam', lam=1, shape=(2,)) # `shape=(2,)` indicates two mixture components. + w = pm.Dirichlet('w', a=np.array([1, 1])) # 2 mixture weights - # As we just need the logp, rather than add a RV to the model, we need to call .dist() - components = pm.Poisson.dist(mu=lam, shape=(2,)) + lam1 = pm.Exponential('lam1', lam=1) + lam2 = pm.Exponential('lam2', lam=1) - w = pm.Dirichlet('w', a=np.array([1, 1])) # two mixture component weights. + # As we just need the logp, rather than add a RV to the model, we need to call `.dist()` + # These two forms are equivalent, but the second benefits from vectorization + components = [ + pm.Poisson.dist(mu=lam1), + pm.Poisson.dist(mu=lam2), + ] + # `shape=(2,)` indicates 2 mixture components + components = pm.Poisson.dist(mu=pm.math.stack([lam1, lam2]), shape=(2,)) like = pm.Mixture('like', w=w, comp_dists=components, observed=data) - # 2-Mixture Poisson using iterable of distributions. - with pm.Model() as model: - lam1 = pm.Exponential('lam1', lam=1) - lam2 = pm.Exponential('lam2', lam=1) - pois1 = pm.Poisson.dist(mu=lam1) - pois2 = pm.Poisson.dist(mu=lam2) + .. code-block:: python - w = pm.Dirichlet('w', a=np.array([1, 1])) + # Mixture of Normal and StudentT variables + with pm.Model() as model: + w = pm.Dirichlet('w', a=np.array([1, 1])) # 2 mixture weights - like = pm.Mixture('like', w=w, comp_dists = [pois1, pois2], observed=data) + mu = pm.Normal("mu", 0, 1) - # npop-Mixture of multidimensional Gaussian - npop = 5 - nd = (3, 4) - with pm.Model() as model: - mu = pm.Normal('mu', mu=np.arange(npop), sigma=1, shape=npop) # Each component has an independent mean + components = [ + pm.Normal.dist(mu=mu, sigma=1), + pm.StudentT.dist(nu=4, mu=mu, sigma=1), + ] - w = pm.Dirichlet('w', a=np.ones(npop)) + like = pm.Mixture('like', w=w, comp_dists=components, observed=data) - components = pm.Normal.dist(mu=mu, sigma=1, shape=nd + (npop,)) # nd + (npop,) shaped multinomial - like = pm.Mixture('like', w=w, comp_dists = components, observed=data, shape=nd) # The resulting mixture is nd-shaped + .. code-block:: python - # Multidimensional Mixture as stacked independent mixtures + # Mixture of (5 x 3) Normal variables with pm.Model() as model: - mu = pm.Normal('mu', mu=np.arange(5), sigma=1, shape=5) # Each component has an independent mean + # w is a stack of 5 independent size 3 weight vectors + # If shape was `(3,)`, the weights would be shared across the 5 replication dimensions + w = pm.Dirichlet('w', a=np.ones(3), shape=(5, 3)) + + # Each of the 3 mixture components has an independent mean + mu = pm.Normal('mu', mu=np.arange(3), sigma=1, shape=3) + + # These two forms are equivalent, but the second benefits from vectorization + components = [ + pm.Normal.dist(mu=mu[0], sigma=1, shape=(5,)), + pm.Normal.dist(mu=mu[1], sigma=1, shape=(5,)), + pm.Normal.dist(mu=mu[2], sigma=1, shape=(5,)), + ] + components = pm.Normal.dist(mu=mu, sigma=1, shape=(5, 3)) + + # The mixture is an array of 5 elements + # Each element can be thought of as an independent scalar mixture of 3 + # components with different means + like = pm.Mixture('like', w=w, comp_dists=components, observed=data) + - w = pm.Dirichlet('w', a=np.ones(3, 5)) # w is a stack of 3 independent 5 component weight arrays + .. code-block:: python + + # Mixture of 2 Dirichlet variables + with pm.Model() as model: + w = pm.Dirichlet('w', a=np.ones(2)) # 2 mixture weights - components = pm.Normal.dist(mu=mu, sigma=1, shape=(3, 5)) + # These two forms are equivalent, but the second benefits from vectorization + components = [ + pm.Dirichlet.dist(a=[1, 10, 100], shape=(3,)), + pm.Dirichlet.dist(a=[100, 10, 1], shape=(3,)), + ] + components = pm.Dirichlet.dist(a=[[1, 10, 100], [100, 10, 1]], shape=(2, 3)) - # The mixture is an array of 3 elements. - # Each can be thought of as an independent scalar mixture of 5 components - like = pm.Mixture('like', w=w, comp_dists = components, observed=data, shape=3) + # The mixture is an array of 3 elements + # Each element comes from only one of the two core Dirichlet components + like = pm.Mixture('like', w=w, comp_dists=components, observed=data) """ - def __init__(self, w, comp_dists, *args, **kwargs): - # comp_dists type checking - if not ( - isinstance(comp_dists, Distribution) - or ( - isinstance(comp_dists, Iterable) - and all(isinstance(c, Distribution) for c in comp_dists) + @classmethod + def dist(cls, w, comp_dists, **kwargs): + if not isinstance(comp_dists, (tuple, list)): + # comp_dists is a single component + comp_dists = [comp_dists] + elif len(comp_dists) == 1: + warnings.warn( + "Single component will be treated as a mixture across the last size dimension.\n" + "To disable this warning do not wrap the single component inside a list or tuple", + UserWarning, ) - ): - raise TypeError( - "Supplied Mixture comp_dists must be a " - "Distribution or an iterable of " - "Distributions. Got {} instead.".format( - type(comp_dists) - if not isinstance(comp_dists, Iterable) - else [type(c) for c in comp_dists] + + # Check that components are not associated with a registered variable in the model + components_ndim = set() + components_ndim_supp = set() + for dist in comp_dists: + # TODO: Allow these to not be a RandomVariable as long as we can call `ndim_supp` on them + # and resize them + if not isinstance(dist, TensorVariable) or not isinstance( + dist.owner.op, RandomVariable + ): + raise ValueError( + f"Component dist must be a distribution created via the `.dist()` API, got {type(dist)}" ) + check_dist_not_registered(dist) + components_ndim.add(dist.ndim) + components_ndim_supp.add(dist.owner.op.ndim_supp) + + if len(components_ndim) > 1: + raise ValueError( + f"Mixture components must all have the same dimensionality, got {components_ndim}" ) - shape = kwargs.pop("shape", ()) - self.w = w = at.as_tensor_variable(w) - self.comp_dists = comp_dists + if len(components_ndim_supp) > 1: + raise ValueError( + f"Mixture components must all have the same support dimensionality, got {components_ndim_supp}" + ) - defaults = kwargs.pop("defaults", []) + w = at.as_tensor_variable(w) + return super().dist([w, *comp_dists], **kwargs) - if all_discrete(comp_dists): - default_dtype = _conversion_map[aesara.config.floatX] - else: - default_dtype = aesara.config.floatX - - try: - self.mean = (w * self._comp_means()).sum(axis=-1) - - if "mean" not in defaults: - defaults.append("mean") - except AttributeError: - pass - dtype = kwargs.pop("dtype", default_dtype) - - try: - if isinstance(comp_dists, Distribution): - comp_mode_logps = comp_dists.logp(comp_dists.mode) - else: - comp_mode_logps = at.stack([cd.logp(cd.mode) for cd in comp_dists]) - - mode_idx = at.argmax(at.log(w) + comp_mode_logps, axis=-1) - self.mode = self._comp_modes()[mode_idx] - - if "mode" not in defaults: - defaults.append("mode") - except (AttributeError, ValueError, IndexError): - pass - - super().__init__(shape, dtype, defaults=defaults, *args, **kwargs) - - @property - def comp_dists(self): - return self._comp_dists - - @comp_dists.setter - def comp_dists(self, comp_dists): - self._comp_dists = comp_dists - if isinstance(comp_dists, Distribution): - self._comp_dist_shapes = to_tuple(comp_dists.shape) - self._broadcast_shape = self._comp_dist_shapes - self.comp_is_distribution = True + @classmethod + def rv_op(cls, weights, *components, size=None, rngs=None): + # Update rngs if provided + if rngs is not None: + components = cls._reseed_components(rngs, *components) + *_, mix_indexes_rng = rngs else: - # Now we check the comp_dists distribution shape, see what - # the broadcast shape would be. This shape will be the dist_shape - # used by generate samples (the shape of a single random sample) - # from the mixture - self._comp_dist_shapes = [to_tuple(d.shape) for d in comp_dists] - # All component distributions must broadcast with each other - try: - self._broadcast_shape = np.broadcast( - *(np.empty(shape) for shape in self._comp_dist_shapes) - ).shape - except Exception: - raise TypeError( - "Supplied comp_dists shapes do not broadcast " - "with each other. comp_dists shapes are: " - "{}".format(self._comp_dist_shapes) - ) + # Create new rng for the mix_indexes internal RV + mix_indexes_rng = aesara.shared(np.random.default_rng()) + + if size is not None: + components = cls._resize_components(size, *components) + + single_component = len(components) == 1 + + # Extract support and replication ndims from components and weights + ndim_supp = components[0].owner.op.ndim_supp + ndim_batch = components[0].ndim - ndim_supp + if single_component: + # One dimension is taken by the mixture axis in the single component case + ndim_batch -= 1 + + # The weights may imply extra batch dimensions that go beyond what is already + # implied by the component dimensions (ndim_batch) + weights_ndim_batch = max(0, weights.ndim - ndim_batch - 1) + + # If weights are large enough that they would broadcast the component distributions + # we try to resize them. This in necessary to avoid duplicated values in the + # random method and for equivalency with the logp method + if weights_ndim_batch: + new_size = at.concatenate( + [ + weights.shape[:weights_ndim_batch], + components[0].shape[:ndim_batch], + ] + ) + components = cls._resize_components(new_size, *components) - # We wrap the _comp_dist.random by adding the kwarg raw_size_, - # which will be the size attribute passed to _comp_samples. - # _comp_samples then calls generate_samples, which may change the - # size value to make it compatible with scipy.stats.*.rvs - self._generators = [] - for comp_dist in comp_dists: - generator = Mixture._comp_dist_random_wrapper(comp_dist.random) - self._generators.append(generator) - self.comp_is_distribution = False - - @staticmethod - def _comp_dist_random_wrapper(random): - """Wrap the comp_dists.random method to take the kwarg raw_size_ and - use it's value to replace the size parameter. This is needed because - generate_samples makes the size value compatible with the - scipy.stats.*.rvs, where size has a different meaning than in the - distributions' random methods. - """ - - def wrapped_random(*args, **kwargs): - raw_size_ = kwargs.pop("raw_size_", None) - # Distribution.random's signature is always (point=None, size=None) - # so size could be the second arg or be given as a kwarg - if len(args) > 1: - args[1] = raw_size_ - else: - kwargs["size"] = raw_size_ - return random(*args, **kwargs) - - return wrapped_random - - def _comp_logp(self, value): - comp_dists = self.comp_dists - - if self.comp_is_distribution: - # Value can be many things. It can be the self tensor, the mode - # test point or it can be observed data. The latter case requires - # careful handling of shape, as the observed's shape could look - # like (repetitions,) + dist_shape, which does not include the last - # mixture axis. For this reason, we try to eval the value.shape, - # compare it with self.shape and shape_padright if we infer that - # the value holds observed data - try: - val_shape = tuple(value.shape.eval()) - except AttributeError: - val_shape = value.shape - except aesara.graph.fg.MissingInputError: - val_shape = None - try: - self_shape = tuple(self.shape) - except AttributeError: - # Happens in __init__ when computing self.logp(comp_modes) - self_shape = None - comp_shape = tuple(comp_dists.shape) - ndim = value.ndim - if val_shape is not None and not ( - (self_shape is not None and val_shape == self_shape) or val_shape == comp_shape - ): - # value is neither the test point nor the self tensor, it - # is likely to hold observed values, so we must compute the - # ndim discarding the dimensions that don't match - # self_shape - if self_shape and val_shape[-len(self_shape) :] == self_shape: - # value has observed values for the Mixture - ndim = len(self_shape) - elif comp_shape and val_shape[-len(comp_shape) :] == comp_shape: - # value has observed for the Mixture components - ndim = len(comp_shape) - else: - # We cannot infer what was passed, we handle this - # as was done in earlier versions of Mixture. We pad - # always if ndim is lower or equal to 1 (default - # legacy implementation) - if ndim <= 1: - ndim = len(comp_dists.shape) - 1 - else: - # We reach this point if value does not hold observed data, so - # we can use its ndim safely to determine shape padding, or it - # holds something that we cannot infer, so we revert to using - # the value's ndim for shape padding. - # We will always pad a single dimension if ndim is lower or - # equal to 1 (default legacy implementation) - if ndim <= 1: - ndim = len(comp_dists.shape) - 1 - if ndim < len(comp_dists.shape): - value_ = at.shape_padright(value, len(comp_dists.shape) - ndim) - else: - value_ = value - return comp_dists.logp(value_) + # Extract support and batch ndims from components and weights + ndim_batch = components[0].ndim - ndim_supp + if single_component: + ndim_batch -= 1 + weights_ndim_batch = max(0, weights.ndim - ndim_batch - 1) + + assert weights_ndim_batch == 0 + + # Create a OpFromGraph that encapsulates the random generating process + # Create dummy input variables with the same type as the ones provided + weights_ = weights.type() + components_ = [component.type() for component in components] + mix_indexes_rng_ = mix_indexes_rng.type() + + mix_axis = -ndim_supp - 1 + + # Stack components across mixture axis + if single_component: + # If single component, we consider it as being already "stacked" + stacked_components_ = components_[0] else: - return at.squeeze( - at.stack([comp_dist.logp(value) for comp_dist in comp_dists], axis=-1) + stacked_components_ = at.stack(components_, axis=mix_axis) + + # Broadcast weights to (*batched dimensions, stack dimension), ignoring support dimensions + weights_broadcast_shape_ = stacked_components_.shape[: ndim_batch + 1] + weights_broadcasted_ = at.broadcast_to(weights_, weights_broadcast_shape_) + + # Draw mixture indexes and append (stack + ndim_supp) broadcastable dimensions to the right + mix_indexes_ = at.random.categorical(weights_broadcasted_, rng=mix_indexes_rng_) + mix_indexes_padded_ = at.shape_padright(mix_indexes_, ndim_supp + 1) + + # Index components and squeeze mixture dimension + mix_out_ = at.take_along_axis(stacked_components_, mix_indexes_padded_, axis=mix_axis) + # There is a Aeasara bug in squeeze with negative axis + # this is equivalent to np.squeeze(mix_out_, axis=mix_axis) + mix_out_ = at.squeeze(mix_out_, axis=mix_out_.ndim + mix_axis) + + # Output mix_indexes rng update so that it can be updated in place + mix_indexes_rng_next_ = mix_indexes_.owner.outputs[0] + + mix_op = MarginalMixtureRV( + inputs=[mix_indexes_rng_, weights_, *components_], + outputs=[mix_indexes_rng_next_, mix_out_], + ) + + # Create the actual MarginalMixture variable + mix_indexes_rng_next, mix_out = mix_op(mix_indexes_rng, weights, *components) + + # We need to set_default_updates ourselves, because the choices RV is hidden + # inside OpFromGraph and PyMC will never find it otherwise + mix_indexes_rng.default_update = mix_indexes_rng_next + + # Reference nodes to facilitate identification in other classmethods + mix_out.tag.weights = weights + mix_out.tag.components = components + mix_out.tag.choices_rng = mix_indexes_rng + + # Component RVs terms are accounted by the Mixture logprob, so they can be + # safely ignore by Aeppl (this tag prevents UserWarning) + for component in components: + component.tag.ignore_logprob = True + + return mix_out + + @classmethod + def _reseed_components(cls, rngs, *components): + *components_rngs, mix_indexes_rng = rngs + assert len(components) == len(components_rngs) + new_components = [] + for component, component_rng in zip(components, components_rngs): + component_node = component.owner + old_rng, *inputs = component_node.inputs + new_components.append( + component_node.op.make_node(component_rng, *inputs).default_output() ) + return new_components + + @classmethod + def _resize_components(cls, size, *components): + if len(components) == 1: + # If we have a single component, we need to keep the length of the mixture + # axis intact, because that's what determines the number of mixture components + mix_axis = -components[0].owner.op.ndim_supp - 1 + mix_size = components[0].shape[mix_axis] + size = tuple(size) + (mix_size,) + + return [change_rv_size(component, size) for component in components] + + @classmethod + def ndim_supp(cls, weights, *components): + # We already checked that all components have the same support dimensionality + return components[0].owner.op.ndim_supp + + @classmethod + def change_size(cls, rv, new_size, expand=False): + weights = rv.tag.weights + components = rv.tag.components + rngs = [component.owner.inputs[0] for component in components] + [rv.tag.choices_rng] + + if expand: + component = rv.tag.components[0] + # Old size is equal to `shape[:-ndim_supp]`, with care needed for `ndim_supp == 0` + size_dims = component.ndim - component.owner.op.ndim_supp + if len(rv.tag.components) == 1: + # If we have a single component, new size should ignore the mixture axis + # dimension, as that is not touched by `_resize_components` + size_dims -= 1 + old_size = components[0].shape[:size_dims] + new_size = to_tuple(new_size) + tuple(old_size) + + components = cls._resize_components(new_size, *components) + + return cls.rv_op(weights, *components, rngs=rngs, size=None) + + @classmethod + def graph_rvs(cls, rv): + # We return rv, which is itself a pseudo RandomVariable, that contains a + # mix_indexes_ RV in its inner graph. We want super().dist() to generate + # (components + 1) rngs for us, and it will do so based on how many elements + # we return here + return (*rv.tag.components, rv) + + +@_get_measurable_outputs.register(MarginalMixtureRV) +def _get_measurable_outputs_MarginalMixtureRV(op, node): + # This tells Aeppl that the second output is the measurable one + return [node.outputs[1]] + + +@_logprob.register(MarginalMixtureRV) +def marginal_mixture_logprob(op, values, rng, weights, *components, **kwargs): + (value,) = values + + # single component + if len(components) == 1: + # Need to broadcast value across mixture axis + mix_axis = -components[0].owner.op.ndim_supp - 1 + components_logp = logp(components[0], at.expand_dims(value, mix_axis)) + else: + components_logp = at.stack( + [logp(component, value) for component in components], + axis=-1, + ) + + mix_logp = at.logsumexp(at.log(weights) + components_logp, axis=-1) + + # Squeeze stack dimension + # There is a Aeasara bug in squeeze with negative axis + # mix_logp = at.squeeze(mix_logp, axis=-1) + mix_logp = at.squeeze(mix_logp, axis=mix_logp.ndim - 1) + + mix_logp = check_parameters( + mix_logp, + 0 <= weights, + weights <= 1, + at.isclose(at.sum(weights, axis=-1), 1), + msg="0 <= weights <= 1, sum(weights) == 1", + ) + + return mix_logp + + +@_get_moment.register(MarginalMixtureRV) +def get_moment_marginal_mixture(op, rv, rng, weights, *components): + ndim_supp = components[0].owner.op.ndim_supp + weights = at.shape_padright(weights, ndim_supp) + mix_axis = -ndim_supp - 1 + + if len(components) == 1: + moment_components = get_moment(components[0]) - def _comp_means(self): - try: - return at.as_tensor_variable(self.comp_dists.mean) - except AttributeError: - return at.squeeze(at.stack([comp_dist.mean for comp_dist in self.comp_dists], axis=-1)) - - def _comp_modes(self): - try: - return at.as_tensor_variable(self.comp_dists.mode) - except AttributeError: - return at.squeeze(at.stack([comp_dist.mode for comp_dist in self.comp_dists], axis=-1)) - - def _comp_samples(self, point=None, size=None, comp_dist_shapes=None, broadcast_shape=None): - # if self.comp_is_distribution: - # samples = self._comp_dists.random(point=point, size=size) - # else: - # if comp_dist_shapes is None: - # comp_dist_shapes = self._comp_dist_shapes - # if broadcast_shape is None: - # broadcast_shape = self._sample_shape - # samples = [] - # for dist_shape, generator in zip(comp_dist_shapes, self._generators): - # sample = generate_samples( - # generator=generator, - # dist_shape=dist_shape, - # broadcast_shape=broadcast_shape, - # point=point, - # size=size, - # not_broadcast_kwargs={"raw_size_": size}, - # ) - # samples.append(sample) - # samples = np.array(broadcast_distribution_samples(samples, size=size)) - # # In the logp we assume the last axis holds the mixture components - # # so we move the axis to the last dimension - # samples = np.moveaxis(samples, 0, -1) - # return samples.astype(self.dtype) - pass - - def infer_comp_dist_shapes(self, point=None): - """Try to infer the shapes of the component distributions, - `comp_dists`, and how they should broadcast together. - The behavior is slightly different if `comp_dists` is a `Distribution` - as compared to when it is a list of `Distribution`s. When it is a list - the following procedure is repeated for each element in the list: - 1. Look up the `comp_dists.shape` - 2. If it is not empty, use it as `comp_dist_shape` - 3. If it is an empty tuple, a single random sample is drawn by calling - `comp_dists.random(point=point, size=None)`, and the returned - test_sample's shape is used as the inferred `comp_dists.shape` - - Parameters - ---------- - point: None or dict (optional) - Dictionary that maps rv names to values, to supply to - `self.comp_dists.random` - - Returns - ------- - comp_dist_shapes: shape tuple or list of shape tuples. - If `comp_dists` is a `Distribution`, it is a shape tuple of the - inferred distribution shape. - If `comp_dists` is a list of `Distribution`s, it is a list of - shape tuples inferred for each element in `comp_dists` - broadcast_shape: shape tuple - The shape that results from broadcasting all component's shapes - together. - """ - # if self.comp_is_distribution: - # if len(self._comp_dist_shapes) > 0: - # comp_dist_shapes = self._comp_dist_shapes - # else: - # # Happens when the distribution is a scalar or when it was not - # # given a shape. In these cases we try to draw a single value - # # to check its shape, we use the provided point dictionary - # # hoping that it can circumvent the Flat and HalfFlat - # # undrawable distributions. - # with _DrawValuesContextBlocker(): - # test_sample = self._comp_dists.random(point=point, size=None) - # comp_dist_shapes = test_sample.shape - # broadcast_shape = comp_dist_shapes - # else: - # # Now we check the comp_dists distribution shape, see what - # # the broadcast shape would be. This shape will be the dist_shape - # # used by generate samples (the shape of a single random sample) - # # from the mixture - # comp_dist_shapes = [] - # for dist_shape, comp_dist in zip(self._comp_dist_shapes, self._comp_dists): - # if dist_shape == tuple(): - # # Happens when the distribution is a scalar or when it was - # # not given a shape. In these cases we try to draw a single - # # value to check its shape, we use the provided point - # # dictionary hoping that it can circumvent the Flat and - # # HalfFlat undrawable distributions. - # with _DrawValuesContextBlocker(): - # test_sample = comp_dist.random(point=point, size=None) - # dist_shape = test_sample.shape - # comp_dist_shapes.append(dist_shape) - # # All component distributions must broadcast with each other - # try: - # broadcast_shape = np.broadcast( - # *[np.empty(shape) for shape in comp_dist_shapes] - # ).shape - # except Exception: - # raise TypeError( - # "Inferred comp_dist shapes do not broadcast " - # "with each other. comp_dists inferred shapes " - # "are: {}".format(comp_dist_shapes) - # ) - # return comp_dist_shapes, broadcast_shape - - def logp(self, value): - """ - Calculate log-probability of defined Mixture distribution at specified value. - - Parameters - ---------- - value: numeric - Value(s) for which log-probability is calculated. If the log probabilities for multiple - values are desired the values must be provided in a numpy array or Aesara tensor - - Returns - ------- - TensorVariable - """ - w = self.w - - return check_parameters( - logsumexp(at.log(w) + self._comp_logp(value), axis=-1, keepdims=False), - w >= 0, - w <= 1, - at.allclose(w.sum(axis=-1), 1), - broadcast_conditions=False, + else: + moment_components = at.stack( + [get_moment(component) for component in components], + axis=mix_axis, ) - def random(self, point=None, size=None): - """ - Draw random values from defined Mixture distribution. - - Parameters - ---------- - point: dict, optional - Dict of variable values on which random values are to be - conditioned (uses default point if not specified). - size: int, optional - Desired size of random sample (returns one sample if not - specified). - - Returns - ------- - array - """ - # # Convert size to tuple - # size = to_tuple(size) - # # Draw mixture weights and infer the comp_dists shapes - # with _DrawValuesContext() as draw_context: - # # We first need to check w and comp_tmp shapes and re compute size - # w = draw_values([self.w], point=point, size=size)[0] - # comp_dist_shapes, broadcast_shape = self.infer_comp_dist_shapes(point=point) - # - # # When size is not None, it's hard to tell the w parameter shape - # if size is not None and w.shape[: len(size)] == size: - # w_shape = w.shape[len(size) :] - # else: - # w_shape = w.shape - # - # # Try to determine parameter shape and dist_shape - # if self.comp_is_distribution: - # param_shape = np.broadcast(np.empty(w_shape), np.empty(broadcast_shape)).shape - # else: - # param_shape = np.broadcast(np.empty(w_shape), np.empty(broadcast_shape + (1,))).shape - # if np.asarray(self.shape).size != 0: - # dist_shape = np.broadcast(np.empty(self.shape), np.empty(param_shape[:-1])).shape - # else: - # dist_shape = param_shape[:-1] - # - # # Try to determine the size that must be used to get the mixture - # # components (i.e. get random choices using w). - # # 1. There must be size independent choices based on w. - # # 2. There must also be independent draws for each non singleton axis - # # of w. - # # 3. There must also be independent draws for each dimension added by - # # self.shape with respect to the w.ndim. These usually correspond to - # # observed variables with batch shapes - # wsh = (1,) * (len(dist_shape) - len(w_shape) + 1) + w_shape[:-1] - # psh = (1,) * (len(dist_shape) - len(param_shape) + 1) + param_shape[:-1] - # w_sample_size = [] - # # Loop through the dist_shape to get the conditions 2 and 3 first - # for i in range(len(dist_shape)): - # if dist_shape[i] != psh[i] and wsh[i] == 1: - # # self.shape[i] is a non singleton dimension (usually caused by - # # observed data) - # sh = dist_shape[i] - # else: - # sh = wsh[i] - # w_sample_size.append(sh) - # if size is not None and w_sample_size[: len(size)] != size: - # w_sample_size = size + tuple(w_sample_size) - # # Broadcast w to the w_sample_size (add a singleton last axis for the - # # mixture components) - # w = broadcast_distribution_samples([w, np.empty(w_sample_size + (1,))], size=size)[0] - # - # # Semiflatten the mixture weights. The last axis is the number of - # # mixture mixture components, and the rest is all about size, - # # dist_shape and broadcasting - # w_ = np.reshape(w, (-1, w.shape[-1])) - # w_samples = random_choice(p=w_, size=None) # w's shape already includes size - # # Now we broadcast the chosen components to the dist_shape - # w_samples = np.reshape(w_samples, w.shape[:-1]) - # if size is not None and dist_shape[: len(size)] != size: - # w_samples = np.broadcast_to(w_samples, size + dist_shape) - # else: - # w_samples = np.broadcast_to(w_samples, dist_shape) - # - # # When size is not None, maybe dist_shape partially overlaps with size - # if size is not None: - # if size == dist_shape: - # size = None - # elif size[-len(dist_shape) :] == dist_shape: - # size = size[: len(size) - len(dist_shape)] - # - # # We get an integer _size instead of a tuple size for drawing the - # # mixture, then we just reshape the output - # if size is None: - # _size = None - # else: - # _size = int(np.prod(size)) - # - # # Compute the total size of the mixture's random call with size - # if _size is not None: - # output_size = int(_size * np.prod(dist_shape) * param_shape[-1]) - # else: - # output_size = int(np.prod(dist_shape) * param_shape[-1]) - # # Get the size we need for the mixture's random call - # if self.comp_is_distribution: - # mixture_size = int(output_size // np.prod(broadcast_shape)) - # else: - # mixture_size = int(output_size // (np.prod(broadcast_shape) * param_shape[-1])) - # if mixture_size == 1 and _size is None: - # mixture_size = None - # - # # Sample from the mixture - # with draw_context: - # mixed_samples = self._comp_samples( - # point=point, - # size=mixture_size, - # broadcast_shape=broadcast_shape, - # comp_dist_shapes=comp_dist_shapes, - # ) - # # Test that the mixture has the same number of "samples" as w - # if w_samples.size != (mixed_samples.size // w.shape[-1]): - # raise ValueError( - # "Inconsistent number of samples from the " - # "mixture and mixture weights. Drew {} mixture " - # "weights elements, and {} samples from the " - # "mixture components.".format(w_samples.size, mixed_samples.size // w.shape[-1]) - # ) - # # Semiflatten the mixture to be able to zip it with w_samples - # w_samples = w_samples.flatten() - # mixed_samples = np.reshape(mixed_samples, (-1, w.shape[-1])) - # # Select the samples from the mixture - # samples = np.array([mixed[choice] for choice, mixed in zip(w_samples, mixed_samples)]) - # # Reshape the samples to the correct output shape - # if size is None: - # samples = np.reshape(samples, dist_shape) - # else: - # samples = np.reshape(samples, size + dist_shape) - # return samples - - def _distr_parameters_for_repr(self): - return [] - - -class NormalMixture(Mixture): + return at.sum(weights * moment_components, axis=mix_axis) + + +class NormalMixture: R""" Normal mixture log-likelihood @@ -578,18 +438,18 @@ class NormalMixture(Mixture): Parameters ---------- - w: array of floats + w : tensor_like of float w >= 0 and w <= 1 the mixture weights - mu: array of floats + mu : tensor_like of float the component means - sigma: array of floats + sigma : tensor_like of float the component standard deviations - tau: array of floats + tau : tensor_like of float the component precisions - comp_shape: shape of the Normal component + comp_shape : shape of the Normal component notice that it should be different than the shape - of the mixture distribution, with one axis being + of the mixture distribution, with the last axis representing the number of components. Notes @@ -605,259 +465,29 @@ class NormalMixture(Mixture): with pm.Model() as gauss_mix: μ = pm.Normal( "μ", - data.mean(), - 10, + mu=data.mean(), + sigma=10, shape=n_components, transform=pm.transforms.ordered, initval=[1, 2, 3], ) - σ = pm.HalfNormal("σ", 10, shape=n_components) + σ = pm.HalfNormal("σ", sigma=10, shape=n_components) weights = pm.Dirichlet("w", np.ones(n_components)) - pm.NormalMixture("y", w=weights, mu=μ, sigma=σ, observed=data) + y = pm.NormalMixture("y", w=weights, mu=μ, sigma=σ, observed=data) """ - def __init__(self, w, mu, sigma=None, tau=None, sd=None, comp_shape=(), *args, **kwargs): + def __new__(cls, name, w, mu, sigma=None, tau=None, sd=None, comp_shape=(), **kwargs): if sd is not None: sigma = sd _, sigma = get_tau_sigma(tau=tau, sigma=sigma) - self.mu = mu = at.as_tensor_variable(mu) - self.sigma = self.sd = sigma = at.as_tensor_variable(sigma) - - super().__init__(w, Normal.dist(mu, sigma=sigma, shape=comp_shape), *args, **kwargs) - - def _distr_parameters_for_repr(self): - return ["w", "mu", "sigma"] - - -class MixtureSameFamily(Distribution): - R""" - Mixture Same Family log-likelihood - This distribution handles mixtures of multivariate distributions in a vectorized - manner. It is used over Mixture distribution when the mixture components are not - present on the last axis of components' distribution. - - .. math::f(x \mid w, \theta) = \sum_{i = 1}^n w_i f_i(x \mid \theta_i)\textrm{ Along mixture\_axis} - - ======== ============================================ - Support :math:`\textrm{support}(f)` - Mean :math:`w\mu` - ======== ============================================ - - Parameters - ---------- - w: array of floats - w >= 0 and w <= 1 - the mixture weights - comp_dists: PyMC distribution (e.g. `pm.Multinomial.dist(...)`) - The `comp_dists` can be scalar or multidimensional distribution. - Assuming its shape to be - (i_0, ..., i_n, mixture_axis, i_n+1, ..., i_N), - the `mixture_axis` is consumed resulting in the shape of mixture as - - (i_0, ..., i_n, i_n+1, ..., i_N). - mixture_axis: int, default = -1 - Axis representing the mixture components to be reduced in the mixture. - - Notes - ----- - The default behaviour resembles Mixture distribution wherein the last axis of component - distribution is reduced. - """ + return Mixture(name, w, Normal.dist(mu, sigma=sigma, size=comp_shape), **kwargs) - def __init__(self, w, comp_dists, mixture_axis=-1, *args, **kwargs): - self.w = at.as_tensor_variable(w) - if not isinstance(comp_dists, Distribution): - raise TypeError( - "The MixtureSameFamily distribution only accepts Distribution " - f"instances as its components. Got {type(comp_dists)} instead." - ) - self.comp_dists = comp_dists - if mixture_axis < 0: - mixture_axis = len(comp_dists.shape) + mixture_axis - if mixture_axis < 0: - raise ValueError( - "`mixture_axis` is supposed to be in shape of components' distribution. " - f"Got {mixture_axis + len(comp_dists.shape)} axis instead out of the bounds." - ) - comp_shape = to_tuple(comp_dists.shape) - self.shape = comp_shape[:mixture_axis] + comp_shape[mixture_axis + 1 :] - self.mixture_axis = mixture_axis - kwargs.setdefault("dtype", self.comp_dists.dtype) - - # Compute the mode so we don't always have to pass a initval - defaults = kwargs.pop("defaults", []) - event_shape = self.comp_dists.shape[mixture_axis + 1 :] - _w = at.shape_padleft( - at.shape_padright(w, len(event_shape)), - len(self.comp_dists.shape) - w.ndim - len(event_shape), - ) - mode = take_along_axis( - self.comp_dists.mode, - at.argmax(_w, keepdims=True), - axis=mixture_axis, - ) - self.mode = mode[(..., 0) + (slice(None),) * len(event_shape)] - - if not all_discrete(comp_dists): - mean = at.as_tensor_variable(self.comp_dists.mean) - self.mean = (_w * mean).sum(axis=mixture_axis) - if "mean" not in defaults: - defaults.append("mean") - defaults.append("mode") - - super().__init__(defaults=defaults, *args, **kwargs) - - def logp(self, value): - """ - Calculate log-probability of defined ``MixtureSameFamily`` distribution at specified value. - - Parameters - ---------- - value : numeric - Value(s) for which log-probability is calculated. If the log probabilities for multiple - values are desired the values must be provided in a numpy array or Aesara tensor - - Returns - ------- - TensorVariable - """ - - comp_dists = self.comp_dists - w = self.w - mixture_axis = self.mixture_axis - - event_shape = comp_dists.shape[mixture_axis + 1 :] - - # To be able to broadcast the comp_dists.logp with w and value - # We first have to pad the shape of w to the right with ones - # so that it can broadcast with the event_shape. - - w = at.shape_padright(w, len(event_shape)) - - # Second, we have to add the mixture_axis to the value tensor - # To insert the mixture axis at the correct location, we use the - # negative number index. This way, we can also handle situations - # in which, value is an observed value with more batch dimensions - # than the ones present in the comp_dists. - comp_dists_ndim = len(comp_dists.shape) - - value = at.shape_padaxis(value, axis=mixture_axis - comp_dists_ndim) - - comp_logp = comp_dists.logp(value) - return check_parameters( - logsumexp(at.log(w) + comp_logp, axis=mixture_axis, keepdims=False), - w >= 0, - w <= 1, - at.allclose(w.sum(axis=mixture_axis - comp_dists_ndim), 1), - broadcast_conditions=False, - ) + @classmethod + def dist(cls, w, mu, sigma=None, tau=None, sd=None, comp_shape=(), **kwargs): + if sd is not None: + sigma = sd + _, sigma = get_tau_sigma(tau=tau, sigma=sigma) - def random(self, point=None, size=None): - """ - Draw random values from defined ``MixtureSameFamily`` distribution. - - Parameters - ---------- - point : dict, optional - Dict of variable values on which random values are to be - conditioned (uses default point if not specified). - size : int, optional - Desired size of random sample (returns one sample if not - specified). - - Returns - ------- - array - """ - # sample_shape = to_tuple(size) - # mixture_axis = self.mixture_axis - # - # # First we draw values for the mixture component weights - # (w,) = draw_values([self.w], point=point, size=size) - # - # # We now draw random choices from those weights. - # # However, we have to ensure that the number of choices has the - # # sample_shape present. - # w_shape = w.shape - # batch_shape = self.comp_dists.shape[: mixture_axis + 1] - # param_shape = np.broadcast(np.empty(w_shape), np.empty(batch_shape)).shape - # event_shape = self.comp_dists.shape[mixture_axis + 1 :] - # - # if np.asarray(self.shape).size != 0: - # comp_dists_ndim = len(self.comp_dists.shape) - # - # # If event_shape of both comp_dists and supplied shape matches, - # # broadcast only batch_shape - # # else broadcast the entire given shape with batch_shape. - # if list(self.shape[mixture_axis - comp_dists_ndim + 1 :]) == list(event_shape): - # dist_shape = np.broadcast( - # np.empty(self.shape[:mixture_axis]), np.empty(param_shape[:mixture_axis]) - # ).shape - # else: - # dist_shape = np.broadcast( - # np.empty(self.shape), np.empty(param_shape[:mixture_axis]) - # ).shape - # else: - # dist_shape = param_shape[:mixture_axis] - # - # # Try to determine the size that must be used to get the mixture - # # components (i.e. get random choices using w). - # # 1. There must be size independent choices based on w. - # # 2. There must also be independent draws for each non singleton axis - # # of w. - # # 3. There must also be independent draws for each dimension added by - # # self.shape with respect to the w.ndim. These usually correspond to - # # observed variables with batch shapes - # wsh = (1,) * (len(dist_shape) - len(w_shape) + 1) + w_shape[:mixture_axis] - # psh = (1,) * (len(dist_shape) - len(param_shape) + 1) + param_shape[:mixture_axis] - # w_sample_size = [] - # # Loop through the dist_shape to get the conditions 2 and 3 first - # for i in range(len(dist_shape)): - # if dist_shape[i] != psh[i] and wsh[i] == 1: - # # self.shape[i] is a non singleton dimension (usually caused by - # # observed data) - # sh = dist_shape[i] - # else: - # sh = wsh[i] - # w_sample_size.append(sh) - # - # if sample_shape is not None and w_sample_size[: len(sample_shape)] != sample_shape: - # w_sample_size = sample_shape + tuple(w_sample_size) - # - # choices = random_choice(p=w, size=w_sample_size) - # - # # We now draw samples from the mixture components random method - # comp_samples = self.comp_dists.random(point=point, size=size) - # if comp_samples.shape[: len(sample_shape)] != sample_shape: - # comp_samples = np.broadcast_to( - # comp_samples, - # shape=sample_shape + comp_samples.shape, - # ) - # - # # At this point the shapes of the arrays involved are: - # # comp_samples.shape = (sample_shape, batch_shape, mixture_axis, event_shape) - # # choices.shape = (sample_shape, batch_shape) - # # - # # To be able to take the choices along the mixture_axis of the - # # comp_samples, we have to add in dimensions to the right of the - # # choices array. - # # We also need to make sure that the batch_shapes of both the comp_samples - # # and choices broadcast with each other. - # - # choices = np.reshape(choices, choices.shape + (1,) * (1 + len(event_shape))) - # - # choices, comp_samples = get_broadcastable_dist_samples([choices, comp_samples], size=size) - # - # # We now take the choices of the mixture components along the mixture_axis - # # but we use the negative index representation to be able to handle the - # # sample_shape - # samples = np.take_along_axis( - # comp_samples, choices, axis=mixture_axis - len(self.comp_dists.shape) - # ) - # - # # The `samples` array still has the `mixture_axis`, so we must remove it: - # output = samples[(..., 0) + (slice(None),) * len(event_shape)] - # return output - - def _distr_parameters_for_repr(self): - return [] + return Mixture.dist(w, Normal.dist(mu, sigma=sigma, size=comp_shape), **kwargs) diff --git a/pymc/tests/test_distributions_moments.py b/pymc/tests/test_distributions_moments.py index f9def30e9cf..6b3aa60fe42 100644 --- a/pymc/tests/test_distributions_moments.py +++ b/pymc/tests/test_distributions_moments.py @@ -100,9 +100,6 @@ def test_all_distributions_have_moments(): # Distributions that have not been refactored for V4 yet not_implemented = { - dist_module.mixture.Mixture, - dist_module.mixture.MixtureSameFamily, - dist_module.mixture.NormalMixture, dist_module.timeseries.AR, dist_module.timeseries.AR1, dist_module.timeseries.GARCH11, diff --git a/pymc/tests/test_distributions_random.py b/pymc/tests/test_distributions_random.py index c400d8e57bb..f01be905414 100644 --- a/pymc/tests/test_distributions_random.py +++ b/pymc/tests/test_distributions_random.py @@ -19,7 +19,6 @@ import aesara import numpy as np -import numpy.random as nr import numpy.testing as npt import pytest import scipy.stats as st @@ -59,7 +58,6 @@ def random_polyagamma(*args, **kwargs): R, RandomPdMatrix, Rplus, - Simplex, build_model, product, ) @@ -75,6 +73,7 @@ def pymc_random( fails=10, extra_args=None, model_args=None, + change_rv_size_fn=change_rv_size, ): if valuedomain is None: valuedomain = Domain([0], edges=(None, None)) @@ -83,7 +82,7 @@ def pymc_random( model_args = {} model, param_vars = build_model(dist, valuedomain, paramdomains, extra_args) - model_dist = change_rv_size(model.named_vars["value"], size, expand=True) + model_dist = change_rv_size_fn(model.named_vars["value"], size, expand=True) pymc_rand = aesara.function([], model_dist) domains = paramdomains.copy() @@ -2045,106 +2044,6 @@ def check_draws_match_expected(self): assert np.all(np.abs(x.eval() - np.array([0.5, 0, 2.0])) < 0.01) -class TestScalarParameterSamples(SeededTest): - @pytest.mark.xfail(reason="This distribution has not been refactored for v4") - def test_normalmixture(self): - def ref_rand(size, w, mu, sigma): - component = np.random.choice(w.size, size=size, p=w) - return np.random.normal(mu[component], sigma[component], size=size) - - pymc_random( - pm.NormalMixture, - { - "w": Simplex(2), - "mu": Domain([[0.05, 2.5], [-5.0, 1.0]], edges=(None, None)), - "sigma": Domain([[1, 1], [1.5, 2.0]], edges=(None, None)), - }, - extra_args={"comp_shape": 2}, - size=1000, - ref_rand=ref_rand, - ) - pymc_random( - pm.NormalMixture, - { - "w": Simplex(3), - "mu": Domain([[-5.0, 1.0, 2.5]], edges=(None, None)), - "sigma": Domain([[1.5, 2.0, 3.0]], edges=(None, None)), - }, - extra_args={"comp_shape": 3}, - size=1000, - ref_rand=ref_rand, - ) - - -@pytest.mark.xfail(reason="This distribution has not been refactored for v4") -def test_mixture_random_shape(): - # test the shape broadcasting in mixture random - y = np.concatenate([nr.poisson(5, size=10), nr.poisson(9, size=10)]) - with pm.Model() as m: - comp0 = pm.Poisson.dist(mu=np.ones(2)) - w0 = pm.Dirichlet("w0", a=np.ones(2), shape=(2,)) - like0 = pm.Mixture("like0", w=w0, comp_dists=comp0, observed=y) - - comp1 = pm.Poisson.dist(mu=np.ones((20, 2)), shape=(20, 2)) - w1 = pm.Dirichlet("w1", a=np.ones(2), shape=(2,)) - like1 = pm.Mixture("like1", w=w1, comp_dists=comp1, observed=y) - - comp2 = pm.Poisson.dist(mu=np.ones(2)) - w2 = pm.Dirichlet("w2", a=np.ones(2), shape=(20, 2)) - like2 = pm.Mixture("like2", w=w2, comp_dists=comp2, observed=y) - - comp3 = pm.Poisson.dist(mu=np.ones(2), shape=(20, 2)) - w3 = pm.Dirichlet("w3", a=np.ones(2), shape=(20, 2)) - like3 = pm.Mixture("like3", w=w3, comp_dists=comp3, observed=y) - - # XXX: This needs to be refactored - rand0, rand1, rand2, rand3 = [None] * 4 # draw_values( - # [like0, like1, like2, like3], point=m.initial_point, size=100 - # ) - assert rand0.shape == (100, 20) - assert rand1.shape == (100, 20) - assert rand2.shape == (100, 20) - assert rand3.shape == (100, 20) - - with m: - ppc = pm.sample_posterior_predictive([m.compute_initial_point()], samples=200) - assert ppc["like0"].shape == (200, 20) - assert ppc["like1"].shape == (200, 20) - assert ppc["like2"].shape == (200, 20) - assert ppc["like3"].shape == (200, 20) - - -@pytest.mark.xfail(reason="This distribution has not been refactored for v4") -def test_mixture_random_shape_fast(): - # test the shape broadcasting in mixture random - y = np.concatenate([nr.poisson(5, size=10), nr.poisson(9, size=10)]) - with pm.Model() as m: - comp0 = pm.Poisson.dist(mu=np.ones(2)) - w0 = pm.Dirichlet("w0", a=np.ones(2), shape=(2,)) - like0 = pm.Mixture("like0", w=w0, comp_dists=comp0, observed=y) - - comp1 = pm.Poisson.dist(mu=np.ones((20, 2)), shape=(20, 2)) - w1 = pm.Dirichlet("w1", a=np.ones(2), shape=(2,)) - like1 = pm.Mixture("like1", w=w1, comp_dists=comp1, observed=y) - - comp2 = pm.Poisson.dist(mu=np.ones(2)) - w2 = pm.Dirichlet("w2", a=np.ones(2), shape=(20, 2)) - like2 = pm.Mixture("like2", w=w2, comp_dists=comp2, observed=y) - - comp3 = pm.Poisson.dist(mu=np.ones(2), shape=(20, 2)) - w3 = pm.Dirichlet("w3", a=np.ones(2), shape=(20, 2)) - like3 = pm.Mixture("like3", w=w3, comp_dists=comp3, observed=y) - - # XXX: This needs to be refactored - rand0, rand1, rand2, rand3 = [None] * 4 # draw_values( - # [like0, like1, like2, like3], point=m.initial_point, size=100 - # ) - assert rand0.shape == (100, 20) - assert rand1.shape == (100, 20) - assert rand2.shape == (100, 20) - assert rand3.shape == (100, 20) - - class TestDensityDist: @pytest.mark.parametrize("size", [(), (3,), (3, 2)], ids=str) def test_density_dist_with_random(self, size): diff --git a/pymc/tests/test_mixture.py b/pymc/tests/test_mixture.py index bd231959d44..51754aa7880 100644 --- a/pymc/tests/test_mixture.py +++ b/pymc/tests/test_mixture.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from contextlib import ExitStack as does_not_raise + import aesara import numpy as np import pytest @@ -21,29 +23,40 @@ from numpy.testing import assert_allclose from scipy.special import logsumexp -import pymc as pm - -from pymc import ( +from pymc.aesaraf import floatX +from pymc.distributions import ( + Categorical, Dirichlet, + DirichletMultinomial, Exponential, Gamma, + HalfNormal, + LKJCholeskyCov, LogNormal, - Metropolis, Mixture, - Model, + Multinomial, MvNormal, Normal, NormalMixture, Poisson, - sample, ) -from pymc.aesaraf import floatX +from pymc.distributions.logprob import logp from pymc.distributions.shape_utils import to_tuple +from pymc.math import expand_packed_triangular +from pymc.model import Model +from pymc.sampling import ( + draw, + sample, + sample_posterior_predictive, + sample_prior_predictive, +) +from pymc.step_methods import Metropolis from pymc.tests.helpers import SeededTest +from pymc.tests.test_distributions import Domain, Simplex +from pymc.tests.test_distributions_moments import assert_moment_is_expected +from pymc.tests.test_distributions_random import pymc_random -pytestmark = pytest.mark.xfail(reason="Mixture not refactored.") -# Generate data def generate_normal_mixture_data(w, mu, sd, size=1000): component = np.random.choice(w.size, size=size, p=w) mu, sd = np.broadcast_arrays(mu, sd) @@ -69,168 +82,318 @@ def generate_poisson_mixture_data(w, mu, size=1000): class TestMixture(SeededTest): - @classmethod - def setup_class(cls): - super().setup_class() - - cls.norm_w = np.array([0.75, 0.25]) - cls.norm_mu = np.array([0.0, 5.0]) - cls.norm_sd = np.ones_like(cls.norm_mu) - cls.norm_x = generate_normal_mixture_data(cls.norm_w, cls.norm_mu, cls.norm_sd, size=1000) - - cls.pois_w = np.array([0.4, 0.6]) - cls.pois_mu = np.array([5.0, 20.0]) - cls.pois_x = generate_poisson_mixture_data(cls.pois_w, cls.pois_mu, size=1000) - - def test_dimensions(self): - a1 = Normal.dist(mu=0, sigma=1) - a2 = Normal.dist(mu=10, sigma=1) - mix = Mixture.dist(w=np.r_[0.5, 0.5], comp_dists=[a1, a2]) - - assert mix.mode.ndim == 0 - assert mix.logp(0.0).ndim == 0 - - value = np.r_[0.0, 1.0, 2.0] - assert mix.logp(value).ndim == 1 - - def test_mixture_list_of_normals(self): - with Model() as model: - w = Dirichlet("w", floatX(np.ones_like(self.norm_w)), shape=self.norm_w.size) - mu = Normal("mu", 0.0, 10.0, shape=self.norm_w.size) - tau = Gamma("tau", 1.0, 1.0, shape=self.norm_w.size) - Mixture( - "x_obs", - w, - [Normal.dist(mu[0], tau=tau[0]), Normal.dist(mu[1], tau=tau[1])], - observed=self.norm_x, + def get_inital_point(self, model): + """Get initial point with untransformed variables for posterior predictive sampling""" + return { + var.name: initial_point + for var, initial_point in zip( + model.unobserved_value_vars, + model.compile_fn(model.unobserved_value_vars)(model.compute_initial_point()), ) - step = Metropolis() - trace = sample(5000, step, random_seed=self.random_seed, progressbar=False, chains=1) - - assert_allclose(np.sort(trace["w"].mean(axis=0)), np.sort(self.norm_w), rtol=0.1, atol=0.1) - assert_allclose( - np.sort(trace["mu"].mean(axis=0)), np.sort(self.norm_mu), rtol=0.1, atol=0.1 - ) - - def test_normal_mixture(self): - with Model() as model: - w = Dirichlet("w", floatX(np.ones_like(self.norm_w)), shape=self.norm_w.size) - mu = Normal("mu", 0.0, 10.0, shape=self.norm_w.size) - tau = Gamma("tau", 1.0, 1.0, shape=self.norm_w.size) - NormalMixture("x_obs", w, mu, tau=tau, observed=self.norm_x) - step = Metropolis() - trace = sample(5000, step, random_seed=self.random_seed, progressbar=False, chains=1) - - assert_allclose(np.sort(trace["w"].mean(axis=0)), np.sort(self.norm_w), rtol=0.1, atol=0.1) - assert_allclose( - np.sort(trace["mu"].mean(axis=0)), np.sort(self.norm_mu), rtol=0.1, atol=0.1 - ) + } @pytest.mark.parametrize( - "nd,ncomp", [(tuple(), 5), (1, 5), (3, 5), ((3, 3), 5), (3, 3), ((3, 3), 3)], ids=str + "weights", + [ + np.array([1, 0]), + np.array([[1, 0], [0, 1], [1, 0]]), + ], ) - def test_normal_mixture_nd(self, nd, ncomp): - nd = to_tuple(nd) - ncomp = int(ncomp) - comp_shape = nd + (ncomp,) - test_mus = np.random.randn(*comp_shape) - test_taus = np.random.gamma(1, 1, size=comp_shape) - observed = generate_normal_mixture_data( - w=np.ones(ncomp) / ncomp, mu=test_mus, sd=1 / np.sqrt(test_taus), size=10 - ) - - with Model() as model0: - mus = Normal("mus", shape=comp_shape) - taus = Gamma("taus", alpha=1, beta=1, shape=comp_shape) - ws = Dirichlet("ws", np.ones(ncomp), shape=(ncomp,)) - mixture0 = NormalMixture("m", w=ws, mu=mus, tau=taus, shape=nd, comp_shape=comp_shape) - obs0 = NormalMixture( - "obs", w=ws, mu=mus, tau=taus, shape=nd, comp_shape=comp_shape, observed=observed - ) + @pytest.mark.parametrize( + "component", + [ + Normal.dist([-10, 10]), + Normal.dist([-10, 10], size=(3, 2)), + Normal.dist([[-15, 15], [-10, 10], [-5, 5]], 1e-3), + Normal.dist([-10, 10], size=(4, 3, 2)), + ], + ) + @pytest.mark.parametrize("size", [None, (3,), (4, 3)]) + def test_single_univariate_component_deterministic_weights(self, weights, component, size): + # Size can't be smaller than what is implied by replication dimensions + if size is not None and len(size) < max(component.ndim - 1, weights.ndim - 1): + return + + mix = Mixture.dist(weights, component, size=size) + mix_eval = mix.eval() + + # Test shape + # component shape is either (4, 3, 2), (3, 2) or (2,) + # weights shape is either (3, 2) or (2,) + if size is not None: + expected_shape = size + elif component.ndim == 3: + expected_shape = (4, 3) + elif component.ndim == 2 or weights.ndim == 2: + expected_shape = (3,) + else: + expected_shape = () + assert mix_eval.shape == expected_shape + + # Test draws + expected_positive = np.zeros_like(mix_eval) + if expected_positive.ndim > 0: + expected_positive[..., :] = (weights == 1)[..., 1] + assert np.all((mix_eval > 0) == expected_positive) + repetitions = np.unique(mix_eval).size < mix_eval.size + assert not repetitions + + # Test logp + mix_logp_eval = logp(mix, mix_eval).eval() + assert mix_logp_eval.shape == expected_shape + bcast_weights = np.broadcast_to(weights, (*expected_shape, 2)) + expected_logp = logp(component, mix_eval[..., None]).eval()[bcast_weights == 1] + expected_logp = expected_logp.reshape(expected_shape) + assert np.allclose(mix_logp_eval, expected_logp) - with Model() as model1: - mus = Normal("mus", shape=comp_shape) - taus = Gamma("taus", alpha=1, beta=1, shape=comp_shape) - ws = Dirichlet("ws", np.ones(ncomp), shape=(ncomp,)) - comp_dist = [ - Normal.dist(mu=mus[..., i], tau=taus[..., i], shape=nd) for i in range(ncomp) - ] - mixture1 = Mixture("m", w=ws, comp_dists=comp_dist, shape=nd) - obs1 = Mixture("obs", w=ws, comp_dists=comp_dist, shape=nd, observed=observed) + @pytest.mark.parametrize( + "weights", + [ + np.array([1, 0]), + np.array([[1, 0], [0, 1], [1, 0]]), + ], + ) + @pytest.mark.parametrize( + "components", + [ + (Normal.dist(-10, 1e-3), Normal.dist(10, 1e-3)), + (Normal.dist(-10, 1e-3, size=(3,)), Normal.dist(10, 1e-3, size=(3,))), + (Normal.dist([-15, -10, -5], 1e-3), Normal.dist([15, 10, 5], 1e-3)), + (Normal.dist(-10, 1e-3, size=(4, 3)), Normal.dist(10, 1e-3, size=(4, 3))), + ], + ) + @pytest.mark.parametrize("size", [None, (3,), (4, 3)]) + def test_list_univariate_components_deterministic_weights(self, weights, components, size): + # Size can't be smaller than what is implied by replication dimensions + if size is not None and len(size) < max(components[0].ndim, weights.ndim - 1): + return + + mix = Mixture.dist(weights, components, size=size) + mix_eval = mix.eval() + + # Test shape + # components[0] shape is either (4, 3), (3,) or () + # weights shape is either (3, 2) or (2,) + if size is not None: + expected_shape = size + elif components[0].ndim == 2: + expected_shape = (4, 3) + elif components[0].ndim == 1 or weights.ndim == 2: + expected_shape = (3,) + else: + expected_shape = () + assert mix_eval.shape == expected_shape + + # Test draws + expected_positive = np.zeros_like(mix_eval) + if expected_positive.ndim > 0: + expected_positive[..., :] = (weights == 1)[..., 1] + assert np.all((mix_eval > 0) == expected_positive) + repetitions = np.unique(mix_eval).size < mix_eval.size + assert not repetitions + + # Test logp + mix_logp_eval = logp(mix, mix_eval).eval() + assert mix_logp_eval.shape == expected_shape + bcast_weights = np.broadcast_to(weights, (*expected_shape, 2)) + expected_logp = np.stack( + ( + logp(components[0], mix_eval).eval(), + logp(components[1], mix_eval).eval(), + ), + axis=-1, + )[bcast_weights == 1] + expected_logp = expected_logp.reshape(expected_shape) + assert np.allclose(mix_logp_eval, expected_logp) - with Model() as model2: - # Expected to fail if comp_shape is not provided, - # nd is multidim and it does not broadcast with ncomp. If by chance - # it does broadcast, an error is raised if the mixture is given - # observed data. - # Furthermore, the Mixture will also raise errors when the observed - # data is multidimensional but it does not broadcast well with - # comp_dists. - mus = Normal("mus", shape=comp_shape) - taus = Gamma("taus", alpha=1, beta=1, shape=comp_shape) - ws = Dirichlet("ws", np.ones(ncomp), shape=(ncomp,)) - if len(nd) > 1: - if nd[-1] != ncomp: - with pytest.raises(ValueError): - NormalMixture("m", w=ws, mu=mus, tau=taus, shape=nd) - mixture2 = None - else: - mixture2 = NormalMixture("m", w=ws, mu=mus, tau=taus, shape=nd) - else: - mixture2 = NormalMixture("m", w=ws, mu=mus, tau=taus, shape=nd) - observed_fails = False - if len(nd) >= 1 and nd != (1,): - try: - np.broadcast(np.empty(comp_shape), observed) - except Exception: - observed_fails = True - if observed_fails: - with pytest.raises(ValueError): - NormalMixture("obs", w=ws, mu=mus, tau=taus, shape=nd, observed=observed) - obs2 = None - else: - obs2 = NormalMixture("obs", w=ws, mu=mus, tau=taus, shape=nd, observed=observed) + @pytest.mark.parametrize( + "weights", + [ + np.array([1, 0]), + np.array([[1, 0], [0, 1], [1, 0], [0, 1]]), + ], + ) + @pytest.mark.parametrize( + "component", + [ + DirichletMultinomial.dist(n=[5_000, 10_000], a=np.ones((3,))), + DirichletMultinomial.dist(n=[5_000, 10_000], a=np.ones((3,)), size=(4, 2)), + ], + ) + @pytest.mark.parametrize("size", [None, (4,), (5, 4)]) + def test_single_multivariate_component_deterministic_weights(self, weights, component, size): + # This test needs seeding to avoid repetitions + rngs = [ + aesara.shared(np.random.default_rng(seed)) + for seed in self.get_random_state().randint(2**30, size=2) + ] + mix = Mixture.dist(weights, component, size=size, rngs=rngs) + mix_eval = mix.eval() + + # Test shape + # component shape is either (4, 2, 3), (2, 3) + # weights shape is either (4, 2) or (2,) + if size is not None: + expected_shape = size + (3,) + elif component.ndim == 3 or weights.ndim == 2: + expected_shape = (4, 3) + else: + expected_shape = (3,) + assert mix_eval.shape == expected_shape + + # Test draws + totals = mix_eval.sum(-1) + expected_large_count = (weights == 1)[..., 1] + assert np.all((totals == 10_000) == expected_large_count) + repetitions = np.unique(mix_eval[..., 0]).size < totals.size + assert not repetitions + + # Test logp + mix_logp_eval = logp(mix, mix_eval).eval() + expected_logp_shape = expected_shape[:-1] + assert mix_logp_eval.shape == expected_logp_shape + bcast_weights = np.broadcast_to(weights, (*expected_logp_shape, 2)) + expected_logp = logp(component, mix_eval[..., None, :]).eval()[bcast_weights == 1] + expected_logp = expected_logp.reshape(expected_logp_shape) + assert np.allclose(mix_logp_eval, expected_logp) - testpoint = model0.compute_initial_point() - testpoint["mus"] = test_mus - testpoint["taus"] = test_taus - assert_allclose(model0.logp(testpoint), model1.logp(testpoint)) - assert_allclose(mixture0.logp(testpoint), mixture1.logp(testpoint)) - assert_allclose(obs0.logp(testpoint), obs1.logp(testpoint)) - if mixture2 is not None and obs2 is not None: - assert_allclose(model0.logp(testpoint), model2.logp(testpoint)) - if mixture2 is not None: - assert_allclose(mixture0.logp(testpoint), mixture2.logp(testpoint)) - if obs2 is not None: - assert_allclose(obs0.logp(testpoint), obs2.logp(testpoint)) - - def test_poisson_mixture(self): - with Model() as model: - w = Dirichlet("w", floatX(np.ones_like(self.pois_w)), shape=self.pois_w.shape) - mu = Gamma("mu", 1.0, 1.0, shape=self.pois_w.size) - Mixture("x_obs", w, Poisson.dist(mu), observed=self.pois_x) - step = Metropolis() - trace = sample(5000, step, random_seed=self.random_seed, progressbar=False, chains=1) + @pytest.mark.parametrize( + "weights", + [ + np.array([1, 0]), + np.array([[1, 0], [0, 1], [1, 0], [0, 1]]), + ], + ) + @pytest.mark.parametrize( + "components", + [ + ( + MvNormal.dist([-15, -10, -5], np.eye(3) * 1e-3), + MvNormal.dist([15, 10, 5], np.eye(3) * 1e-3), + ), + ( + MvNormal.dist([-15, -10, -5], np.eye(3) * 1e-3, size=(4,)), + MvNormal.dist([15, 10, 5], np.eye(3) * 1e-3, size=(4,)), + ), + ], + ) + @pytest.mark.parametrize("size", [None, (4,), (5, 4)]) + def test_list_multivariate_components_deterministic_weights(self, weights, components, size): + mix = Mixture.dist(weights, components, size=size) + mix_eval = mix.eval() + + # Test shape + # components[0] shape is either (4, 3) or (3,) + # weights shape is either (4, 2) or (2,) + if size is not None: + expected_shape = size + (3,) + elif components[0].ndim == 2 or weights.ndim == 2: + expected_shape = (4, 3) + else: + expected_shape = (3,) + assert mix_eval.shape == expected_shape + + # Test draws + expected_positive = np.zeros_like(mix_eval) + expected_positive[..., :] = (weights == 1)[..., 1, None] + assert np.all((mix_eval > 0) == expected_positive) + repetitions = np.unique(mix_eval).size < mix_eval.size + assert not repetitions + + # Test logp + # MvNormal logp is currently limited to 2d values + expectation = pytest.raises(ValueError) if mix_eval.ndim > 2 else does_not_raise() + with expectation: + mix_logp_eval = logp(mix, mix_eval).eval() + assert mix_logp_eval.shape == expected_shape[:-1] + bcast_weights = np.broadcast_to(weights, (*expected_shape[:-1], 2)) + expected_logp = np.stack( + ( + logp(components[0], mix_eval).eval(), + logp(components[1], mix_eval).eval(), + ), + axis=-1, + )[bcast_weights == 1] + expected_logp = expected_logp.reshape(expected_shape[:-1]) + assert np.allclose(mix_logp_eval, expected_logp) + + def test_component_choice_random(self): + """Test that mixture choices change over evaluations""" + with Model() as m: + weights = [0.5, 0.5] + components = [Normal.dist(-10, 0.01), Normal.dist(10, 0.01)] + mix = Mixture.dist(weights, components) + draws = draw(mix, draws=10) + # Probability of coming from same component 10 times is 0.5**10 + assert np.unique(draws > 0).size == 2 - assert_allclose(np.sort(trace["w"].mean(axis=0)), np.sort(self.pois_w), rtol=0.1, atol=0.1) - assert_allclose( - np.sort(trace["mu"].mean(axis=0)), np.sort(self.pois_mu), rtol=0.1, atol=0.1 + @pytest.mark.parametrize( + "comp_dists", + ( + [Normal.dist(size=(2,))], + [Normal.dist(), Normal.dist()], + [MvNormal.dist(np.ones(3), np.eye(3), size=(2,))], + [ + MvNormal.dist(np.ones(3), np.eye(3)), + MvNormal.dist(np.ones(3), np.eye(3)), + ], + ), + ) + def test_components_expanded_by_weights(self, comp_dists): + """Test that components are expanded when size or weights are larger than components""" + univariate = comp_dists[0].owner.op.ndim_supp == 0 + + mix = Mixture.dist( + w=Dirichlet.dist([1, 1], shape=(3, 2)), + comp_dists=comp_dists, + size=(3,), ) - - def test_mixture_list_of_poissons(self): - with Model() as model: - w = Dirichlet("w", floatX(np.ones_like(self.pois_w)), shape=self.pois_w.shape) - mu = Gamma("mu", 1.0, 1.0, shape=self.pois_w.size) - Mixture("x_obs", w, [Poisson.dist(mu[0]), Poisson.dist(mu[1])], observed=self.pois_x) - step = Metropolis() - trace = sample(5000, step, random_seed=self.random_seed, progressbar=False, chains=1) - - assert_allclose(np.sort(trace["w"].mean(axis=0)), np.sort(self.pois_w), rtol=0.1, atol=0.1) - assert_allclose( - np.sort(trace["mu"].mean(axis=0)), np.sort(self.pois_mu), rtol=0.1, atol=0.1 + draws = mix.eval() + assert draws.shape == (3,) if univariate else (3, 3) + assert np.unique(draws).size == draws.size + + mix = Mixture.dist( + w=Dirichlet.dist([1, 1], shape=(4, 3, 2)), + comp_dists=comp_dists, + size=(3,), ) + draws = mix.eval() + assert draws.shape == (4, 3) if univariate else (4, 3, 3) + assert np.unique(draws).size == draws.size - def test_mixture_of_mvn(self): + @pytest.mark.parametrize( + "comp_dists", + ( + [Normal.dist(size=(2,))], + [Normal.dist(), Normal.dist()], + [MvNormal.dist(np.ones(3), np.eye(3), size=(2,))], + [ + MvNormal.dist(np.ones(3), np.eye(3)), + MvNormal.dist(np.ones(3), np.eye(3)), + ], + ), + ) + @pytest.mark.parametrize("expand", (False, True)) + def test_change_size(self, comp_dists, expand): + univariate = comp_dists[0].owner.op.ndim_supp == 0 + + mix = Mixture.dist(w=Dirichlet.dist([1, 1]), comp_dists=comp_dists) + mix = Mixture.change_size(mix, new_size=(4,), expand=expand) + draws = mix.eval() + expected_shape = (4,) if univariate else (4, 3) + assert draws.shape == expected_shape + assert np.unique(draws).size == draws.size + + mix = Mixture.dist(w=Dirichlet.dist([1, 1]), comp_dists=comp_dists, size=(3,)) + mix = Mixture.change_size(mix, new_size=(5, 4), expand=expand) + draws = mix.eval() + expected_shape = (5, 4) if univariate else (5, 4, 3) + if expand: + expected_shape = expected_shape + (3,) + assert draws.shape == expected_shape + assert np.unique(draws).size == draws.size + + def test_list_mvnormals_logp(self): mu1 = np.asarray([0.0, 1.0]) cov1 = np.diag([1.5, 2.5]) mu2 = np.asarray([1.0, 0.0]) @@ -249,22 +412,170 @@ def test_mixture_of_mvn(self): st.multivariate_normal.logpdf(obs, mu2, cov2), ) ).T - complogp = y.distribution._comp_logp(aesara.shared(obs)).eval() - assert_allclose(complogp, complogp_st) # check logp of mixture testpoint = model.compute_initial_point() mixlogp_st = logsumexp(np.log(testpoint["w"]) + complogp_st, axis=-1, keepdims=False) - assert_allclose(y.logp_elemwise(testpoint), mixlogp_st) + assert_allclose(model.compile_logp(y, sum=False)(testpoint)[0], mixlogp_st) # check logp of model priorlogp = st.dirichlet.logpdf( x=testpoint["w"], alpha=np.ones(2), ) - assert_allclose(model.logp(testpoint), mixlogp_st.sum() + priorlogp) + assert_allclose(model.compile_logp()(testpoint), mixlogp_st.sum() + priorlogp) + + def test_single_poisson_sampling(self): + pois_w = np.array([0.4, 0.6]) + pois_mu = np.array([5.0, 20.0]) + pois_x = generate_poisson_mixture_data(pois_w, pois_mu, size=1000) + + with Model() as model: + w = Dirichlet("w", floatX(np.ones_like(pois_w)), shape=pois_w.shape) + mu = Gamma("mu", 1.0, 1.0, shape=pois_w.size) + Mixture("x_obs", w, Poisson.dist(mu), observed=pois_x) + step = Metropolis() + trace = sample( + 5000, + step, + random_seed=self.random_seed, + progressbar=False, + chains=1, + return_inferencedata=False, + ) + + assert_allclose(np.sort(trace["w"].mean(axis=0)), np.sort(pois_w), rtol=0.1, atol=0.1) + assert_allclose(np.sort(trace["mu"].mean(axis=0)), np.sort(pois_mu), rtol=0.1, atol=0.1) + + def test_list_poissons_sampling(self): + pois_w = np.array([0.4, 0.6]) + pois_mu = np.array([5.0, 20.0]) + pois_x = generate_poisson_mixture_data(pois_w, pois_mu, size=1000) + + with Model() as model: + w = Dirichlet("w", floatX(np.ones_like(pois_w)), shape=pois_w.shape) + mu = Gamma("mu", 1.0, 1.0, shape=pois_w.size) + Mixture("x_obs", w, [Poisson.dist(mu[0]), Poisson.dist(mu[1])], observed=pois_x) + trace = sample( + 5000, + chains=1, + step=Metropolis(), + random_seed=self.random_seed, + progressbar=False, + return_inferencedata=False, + ) + + assert_allclose(np.sort(trace["w"].mean(axis=0)), np.sort(pois_w), rtol=0.1, atol=0.1) + assert_allclose(np.sort(trace["mu"].mean(axis=0)), np.sort(pois_mu), rtol=0.1, atol=0.1) + + def test_list_normals_sampling(self): + norm_w = np.array([0.75, 0.25]) + norm_mu = np.array([0.0, 5.0]) + norm_sd = np.ones_like(norm_mu) + norm_x = generate_normal_mixture_data(norm_w, norm_mu, norm_sd, size=1000) + + with Model() as model: + w = Dirichlet("w", floatX(np.ones_like(norm_w)), shape=norm_w.size) + mu = Normal("mu", 0.0, 10.0, shape=norm_w.size) + tau = Gamma("tau", 1.0, 1.0, shape=norm_w.size) + Mixture( + "x_obs", + w, + [Normal.dist(mu[0], tau=tau[0]), Normal.dist(mu[1], tau=tau[1])], + observed=norm_x, + ) + trace = sample( + 5000, + chains=1, + step=Metropolis(), + random_seed=self.random_seed, + progressbar=False, + return_inferencedata=False, + ) + + assert_allclose(np.sort(trace["w"].mean(axis=0)), np.sort(norm_w), rtol=0.1, atol=0.1) + assert_allclose(np.sort(trace["mu"].mean(axis=0)), np.sort(norm_mu), rtol=0.1, atol=0.1) + + def test_single_poisson_predictive_sampling_shape(self): + # test the shape broadcasting in mixture random + rng = self.get_random_state() + y = np.concatenate([rng.poisson(5, size=10), rng.poisson(9, size=10)]) + with Model() as model: + comp0 = Poisson.dist(mu=np.ones(2)) + w0 = Dirichlet("w0", a=np.ones(2), shape=(2,)) + like0 = Mixture("like0", w=w0, comp_dists=comp0, observed=y) + + comp1 = Poisson.dist(mu=np.ones((20, 2)), shape=(20, 2)) + w1 = Dirichlet("w1", a=np.ones(2), shape=(2,)) + like1 = Mixture("like1", w=w1, comp_dists=comp1, observed=y) + + comp2 = Poisson.dist(mu=np.ones(2)) + w2 = Dirichlet("w2", a=np.ones(2), shape=(20, 2)) + like2 = Mixture("like2", w=w2, comp_dists=comp2, observed=y) + + comp3 = Poisson.dist(mu=np.ones(2), shape=(20, 2)) + w3 = Dirichlet("w3", a=np.ones(2), shape=(20, 2)) + like3 = Mixture("like3", w=w3, comp_dists=comp3, observed=y) + + n_samples = 30 + with model: + prior = sample_prior_predictive(samples=n_samples, return_inferencedata=False) + ppc = sample_posterior_predictive( + [self.get_inital_point(model)], samples=n_samples, return_inferencedata=False + ) + + assert prior["like0"].shape == (n_samples, 20) + assert prior["like1"].shape == (n_samples, 20) + assert prior["like2"].shape == (n_samples, 20) + assert prior["like3"].shape == (n_samples, 20) + + assert ppc["like0"].shape == (n_samples, 20) + assert ppc["like1"].shape == (n_samples, 20) + assert ppc["like2"].shape == (n_samples, 20) + assert ppc["like3"].shape == (n_samples, 20) + + def test_list_mvnormals_predictive_sampling_shape(self): + N = 100 # number of data points + K = 3 # number of mixture components + D = 3 # dimensionality of the data + X = MvNormal.dist(np.zeros(D), np.eye(D), size=N).eval() + + with Model() as model: + pi = Dirichlet("pi", np.ones(K), shape=(K,)) + + comp_dist = [] + mu = [] + packed_chol = [] + chol = [] + for i in range(K): + mu.append(Normal(f"mu{i}", 0, 10, shape=D)) + packed_chol.append( + LKJCholeskyCov( + f"chol_cov_{i}", + eta=2, + n=D, + sd_dist=HalfNormal.dist(2.5, size=D), + compute_corr=False, + ) + ) + chol.append(expand_packed_triangular(D, packed_chol[i], lower=True)) + comp_dist.append(MvNormal.dist(mu=mu[i], chol=chol[i], shape=D)) + + Mixture("x_obs", pi, comp_dist, observed=X) + + n_samples = 20 + with model: + prior = sample_prior_predictive(samples=n_samples, return_inferencedata=False) + ppc = sample_posterior_predictive( + [self.get_inital_point(model)], samples=n_samples, return_inferencedata=False + ) + assert ppc["x_obs"].shape == (n_samples,) + X.shape + assert prior["x_obs"].shape == (n_samples,) + X.shape + assert prior["mu0"].shape == (n_samples, D) + assert prior["chol_cov_0"].shape == (n_samples, D * (D + 1) // 2) - def test_mixture_of_mixture(self): + @pytest.mark.xfail(reason="Mixture from single component not refactored yet") + def test_nested_mixture(self): if aesara.config.floatX == "float32": rtol = 1e-4 else: @@ -338,127 +649,195 @@ def mixmixlogp(value, point): assert_allclose(mixmixlogpg, mix.logp_elemwise(test_point), rtol=rtol) assert_allclose(priorlogp + mixmixlogpg.sum(), model.logp(test_point), rtol=rtol) - def test_sample_prior_and_posterior(self): - def build_toy_dataset(N, K): - pi = np.array([0.2, 0.5, 0.3]) - mus = [[1, 1, 1], [-1, -1, -1], [2, -2, 0]] - stds = [[0.1, 0.1, 0.1], [0.1, 0.2, 0.2], [0.2, 0.3, 0.3]] - x = np.zeros((N, 3), dtype=np.float32) - y = np.zeros((N,), dtype=np.int) - for n in range(N): - k = np.argmax(np.random.multinomial(1, pi)) - x[n, :] = np.random.multivariate_normal(mus[k], np.diag(stds[k])) - y[n] = k - return x, y + def test_iterable_single_component_warning(self): + with pytest.warns(None) as record: + Mixture.dist(w=[0.5, 0.5], comp_dists=Normal.dist(size=2)) + Mixture.dist(w=[0.5, 0.5], comp_dists=[Normal.dist(size=2), Normal.dist(size=2)]) + assert not record - N = 100 # number of data points - K = 3 # number of mixture components - D = 3 # dimensionality of the data + with pytest.warns(UserWarning, match="Single component will be treated as a mixture"): + Mixture.dist(w=[0.5, 0.5], comp_dists=[Normal.dist(size=2)]) - X, y = build_toy_dataset(N, K) - with pm.Model() as model: - pi = pm.Dirichlet("pi", np.ones(K), shape=(K,)) - - comp_dist = [] - mu = [] - packed_chol = [] - chol = [] - for i in range(K): - mu.append(pm.Normal("mu%i" % i, 0, 10, shape=D)) - packed_chol.append( - pm.LKJCholeskyCov( - "chol_cov_%i" % i, eta=2, n=D, sd_dist=pm.HalfNormal.dist(2.5, size=D) - ) - ) - chol.append(pm.expand_packed_triangular(D, packed_chol[i], lower=True)) - comp_dist.append(pm.MvNormal.dist(mu=mu[i], chol=chol[i], shape=D)) +class TestNormalMixture(SeededTest): + def test_normal_mixture_sampling(self): + norm_w = np.array([0.75, 0.25]) + norm_mu = np.array([0.0, 5.0]) + norm_sd = np.ones_like(norm_mu) + norm_x = generate_normal_mixture_data(norm_w, norm_mu, norm_sd, size=1000) - pm.Mixture("x_obs", pi, comp_dist, observed=X) - with model: - idata = pm.sample(30, tune=10, chains=1) + with Model() as model: + w = Dirichlet("w", floatX(np.ones_like(norm_w)), shape=norm_w.size) + mu = Normal("mu", 0.0, 10.0, shape=norm_w.size) + tau = Gamma("tau", 1.0, 1.0, shape=norm_w.size) + NormalMixture("x_obs", w, mu, tau=tau, observed=norm_x) + step = Metropolis() + trace = sample( + 5000, + step, + random_seed=self.random_seed, + progressbar=False, + chains=1, + return_inferencedata=False, + ) - n_samples = 20 - with model: - ppc = pm.sample_posterior_predictive(idata, n_samples) - prior = pm.sample_prior_predictive(samples=n_samples) - assert ppc["x_obs"].shape == (n_samples,) + X.shape - assert prior["x_obs"].shape == (n_samples,) + X.shape - assert prior["mu0"].shape == (n_samples, D) - assert prior["chol_cov_0"].shape == (n_samples, D * (D + 1) // 2) + assert_allclose(np.sort(trace["w"].mean(axis=0)), np.sort(norm_w), rtol=0.1, atol=0.1) + assert_allclose(np.sort(trace["mu"].mean(axis=0)), np.sort(norm_mu), rtol=0.1, atol=0.1) + @pytest.mark.parametrize( + "nd, ncomp", [(tuple(), 5), (1, 5), (3, 5), ((3, 3), 5), (3, 3), ((3, 3), 3)], ids=str + ) + def test_normal_mixture_nd(self, nd, ncomp): + nd = to_tuple(nd) + ncomp = int(ncomp) + comp_shape = nd + (ncomp,) + test_mus = np.random.randn(*comp_shape) + test_taus = np.random.gamma(1, 1, size=comp_shape) + observed = generate_normal_mixture_data( + w=np.ones(ncomp) / ncomp, mu=test_mus, sd=1 / np.sqrt(test_taus), size=10 + ) -class TestMixtureVsLatent(SeededTest): - def setup_method(self, *args, **kwargs): - super().setup_method(*args, **kwargs) - self.nd = 3 - self.npop = 3 - self.mus = at.as_tensor_variable( - np.tile( - np.reshape( - np.arange(self.npop), - ( - 1, - -1, - ), - ), - ( - self.nd, - 1, - ), + with Model() as model0: + mus = Normal("mus", shape=comp_shape) + taus = Gamma("taus", alpha=1, beta=1, shape=comp_shape) + ws = Dirichlet("ws", np.ones(ncomp), shape=(ncomp,)) + mixture0 = NormalMixture("m", w=ws, mu=mus, tau=taus, shape=nd, comp_shape=comp_shape) + obs0 = NormalMixture( + "obs", w=ws, mu=mus, tau=taus, comp_shape=comp_shape, observed=observed ) + + with Model() as model1: + mus = Normal("mus", shape=comp_shape) + taus = Gamma("taus", alpha=1, beta=1, shape=comp_shape) + ws = Dirichlet("ws", np.ones(ncomp), shape=(ncomp,)) + comp_dist = [ + Normal.dist(mu=mus[..., i], tau=taus[..., i], shape=nd) for i in range(ncomp) + ] + mixture1 = Mixture("m", w=ws, comp_dists=comp_dist, shape=nd) + obs1 = Mixture("obs", w=ws, comp_dists=comp_dist, observed=observed) + + with Model() as model2: + # Test that results are correct without comp_shape being passed to the Mixture. + # This used to fail in V3 + mus = Normal("mus", shape=comp_shape) + taus = Gamma("taus", alpha=1, beta=1, shape=comp_shape) + ws = Dirichlet("ws", np.ones(ncomp), shape=(ncomp,)) + mixture2 = NormalMixture("m", w=ws, mu=mus, tau=taus, shape=nd) + obs2 = NormalMixture("obs", w=ws, mu=mus, tau=taus, observed=observed) + + testpoint = model0.compute_initial_point() + testpoint["mus"] = test_mus + testpoint["taus_log__"] = np.log(test_taus) + for logp0, logp1, logp2 in zip( + model0.compile_logp(vars=[mixture0, obs0], sum=False)(testpoint), + model1.compile_logp(vars=[mixture1, obs1], sum=False)(testpoint), + model2.compile_logp(vars=[mixture2, obs2], sum=False)(testpoint), + ): + assert_allclose(logp0, logp1) + assert_allclose(logp0, logp2) + + def test_random(self): + def ref_rand(size, w, mu, sigma): + component = np.random.choice(w.size, size=size, p=w) + return np.random.normal(mu[component], sigma[component], size=size) + + pymc_random( + NormalMixture, + { + "w": Simplex(2), + "mu": Domain([[0.05, 2.5], [-5.0, 1.0]], edges=(None, None)), + "sigma": Domain([[1, 1], [1.5, 2.0]], edges=(None, None)), + }, + extra_args={"comp_shape": 2}, + size=1000, + ref_rand=ref_rand, + change_rv_size_fn=Mixture.change_size, + ) + pymc_random( + NormalMixture, + { + "w": Simplex(3), + "mu": Domain([[-5.0, 1.0, 2.5]], edges=(None, None)), + "sigma": Domain([[1.5, 2.0, 3.0]], edges=(None, None)), + }, + extra_args={"comp_shape": 3}, + size=1000, + ref_rand=ref_rand, + change_rv_size_fn=Mixture.change_size, ) - def test_1d_w(self): - nd = self.nd - npop = self.npop - mus = self.mus - size = 100 - with pm.Model() as model: - m = pm.NormalMixture( - "m", w=np.ones(npop) / npop, mu=mus, sigma=1e-5, comp_shape=(nd, npop), shape=nd - ) - z = pm.Categorical("z", p=np.ones(npop) / npop) - latent_m = pm.Normal("latent_m", mu=mus[..., z], sigma=1e-5, shape=nd) - m_val = m.random(size=size) - latent_m_val = latent_m.random(size=size) - assert m_val.shape == latent_m_val.shape - # Test that each element in axis = -1 comes from the same mixture - # component - assert all(np.all(np.diff(m_val) < 1e-3, axis=-1)) - assert all(np.all(np.diff(latent_m_val) < 1e-3, axis=-1)) +class TestMixtureVsLatent(SeededTest): + """This class contains tests that compare a marginal Mixture with a latent indexed Mixture""" - self.samples_from_same_distribution(m_val, latent_m_val) - self.logp_matches(m, latent_m, z, npop, model=model) + def test_scalar_components(self): + nd = 3 + npop = 4 + # [[0, 1, 2, 3], [0, 1, 2, 3], [0, 1, 2, 3]] + mus = at.constant(np.full((nd, npop), np.arange(npop))) - def test_2d_w(self): - nd = self.nd - npop = self.npop - mus = self.mus - size = 100 - with pm.Model() as model: - m = pm.NormalMixture( + with Model(rng_seeder=self.get_random_state()) as model: + m = NormalMixture( "m", - w=np.ones((nd, npop)) / npop, + w=np.ones(npop) / npop, mu=mus, sigma=1e-5, comp_shape=(nd, npop), shape=nd, ) - z = pm.Categorical("z", p=np.ones(npop) / npop, shape=nd) + z = Categorical("z", p=np.ones(npop) / npop, shape=nd) mu = at.as_tensor_variable([mus[i, z[i]] for i in range(nd)]) - latent_m = pm.Normal("latent_m", mu=mu, sigma=1e-5, shape=nd) + latent_m = Normal("latent_m", mu=mu, sigma=1e-5, shape=nd) + + size = 100 + m_val = draw(m, draws=size) + latent_m_val = draw(latent_m, draws=size) - m_val = m.random(size=size) - latent_m_val = latent_m.random(size=size) assert m_val.shape == latent_m_val.shape # Test that each element in axis = -1 can come from independent # components assert not all(np.all(np.diff(m_val) < 1e-3, axis=-1)) assert not all(np.all(np.diff(latent_m_val) < 1e-3, axis=-1)) + self.samples_from_same_distribution(m_val, latent_m_val) + + # Check that logp is the same whether elements of the last axis are mixed or not + logp_fn = model.compile_logp(vars=[m]) + assert np.isclose(logp_fn({"m": [0, 0, 0]}), logp_fn({"m": [0, 1, 2]})) + self.logp_matches(m, latent_m, z, npop, model=model) + def test_vector_components(self): + nd = 3 + npop = 4 + # [[0, 1, 2, 3], [0, 1, 2, 3], [0, 1, 2, 3]] + mus = at.constant(np.full((nd, npop), np.arange(npop))) + + with Model(rng_seeder=self.get_random_state()) as model: + m = Mixture( + "m", + w=np.ones(npop) / npop, + # MvNormal distribution with squared sigma diagonal covariance should + # be equal to vector of Normals from latent_m + comp_dists=[MvNormal.dist(mus[:, i], np.eye(nd) * 1e-5**2) for i in range(npop)], + ) + z = Categorical("z", p=np.ones(npop) / npop) + latent_m = Normal("latent_m", mu=mus[..., z], sigma=1e-5, shape=nd) + + size = 100 + m_val = draw(m, draws=size) + latent_m_val = draw(latent_m, draws=size) + assert m_val.shape == latent_m_val.shape + # Test that each element in axis = -1 comes from the same mixture + # component + assert np.all(np.diff(m_val) < 1e-3) + assert np.all(np.diff(latent_m_val) < 1e-3) + # TODO: The following statistical test appears to be more flaky than expected + # even though the distributions should be the same. Seeding should make it + # stable but might be worth investigating further self.samples_from_same_distribution(m_val, latent_m_val) + + # Check that mixing of values in the last axis leads to smaller logp + logp_fn = model.compile_logp(vars=[m]) + assert logp_fn({"m": [0, 0, 0]}) > logp_fn({"m": [0, 1, 0]}) > logp_fn({"m": [0, 1, 2]}) self.logp_matches(m, latent_m, z, npop, model=model) def samples_from_same_distribution(self, *args): @@ -468,35 +847,51 @@ def samples_from_same_distribution(self, *args): _, p_correlation = st.ks_2samp( *(np.array([np.corrcoef(ss) for ss in s]).flatten() for s in args) ) + # This has a success rate of 10% (0.95**2), even if the distributions are the same assert p_marginal >= 0.05 and p_correlation >= 0.05 def logp_matches(self, mixture, latent_mix, z, npop, model): + def loose_logp(model, vars): + """Return logp function that accepts dictionary with unused variables as input""" + return model.compile_fn( + model.logpt(vars=vars, sum=False), + inputs=model.value_vars, + on_unused_input="ignore", + ) + if aesara.config.floatX == "float32": rtol = 1e-4 else: rtol = 1e-7 test_point = model.compute_initial_point() - test_point["latent_m"] = test_point["m"] - mix_logp = mixture.logp(test_point) - logps = [] + test_point["m"] = test_point["latent_m"] + + mix_logp = loose_logp(model, mixture)(test_point)[0] + + z_shape = z.shape.eval() + latent_mix_components_logps = [] for component in range(npop): - test_point["z"] = component * np.ones(z.distribution.shape) - # Count the number of axes that should be broadcasted from z to - # modify the logp - sh1 = test_point["z"].shape - sh2 = test_point["latent_m"].shape - if len(sh1) > len(sh2): - sh2 = (1,) * (len(sh1) - len(sh2)) + sh2 - elif len(sh2) > len(sh1): - sh1 = (1,) * (len(sh2) - len(sh1)) + sh1 - reps = np.prod([s2 if s1 != s2 else 1 for s1, s2 in zip(sh1, sh2)]) - z_logp = z.logp(test_point) * reps - logps.append(z_logp + latent_mix.logp(test_point)) - latent_mix_logp = logsumexp(np.array(logps), axis=0) + test_point["z"] = np.full(z_shape, component) + z_logp = loose_logp(model, z)(test_point)[0] + latent_mix_component_logp = loose_logp(model, latent_mix)(test_point)[0] + # If the mixture ndim_supp is a vector, the logp should be summed within + # components, as its items are not independent + if mix_logp.ndim == 0: + latent_mix_component_logp = latent_mix_component_logp.sum() + latent_mix_components_logps.append(z_logp + latent_mix_component_logp) + latent_mix_logp = logsumexp(np.array(latent_mix_components_logps), axis=0) + if mix_logp.ndim == 0: + latent_mix_logp = latent_mix_logp.sum() + assert_allclose(mix_logp, latent_mix_logp, rtol=rtol) class TestMixtureSameFamily(SeededTest): + """Tests that used to belong to deprecated `TestMixtureSameFamily`. + + The functionality is now expected to be provided by `Mixture` + """ + @classmethod def setup_class(cls): super().setup_class() @@ -510,19 +905,18 @@ def test_with_multinomial(self, batch_shape): n = 100 * np.ones((*batch_shape, 1)) w = np.ones(self.mixture_comps) / self.mixture_comps mixture_axis = len(batch_shape) - with pm.Model() as model: - comp_dists = pm.Multinomial.dist(p=p, n=n, shape=(*batch_shape, self.mixture_comps, 3)) - mixture = pm.MixtureSameFamily( + with Model() as model: + comp_dists = Multinomial.dist(p=p, n=n, shape=(*batch_shape, self.mixture_comps, 3)) + mixture = Mixture( "mixture", w=w, comp_dists=comp_dists, - mixture_axis=mixture_axis, shape=(*batch_shape, 3), ) - prior = pm.sample_prior_predictive(samples=self.n_samples) + prior = sample_prior_predictive(samples=self.n_samples, return_inferencedata=False) assert prior["mixture"].shape == (self.n_samples, *batch_shape, 3) - assert mixture.random(size=self.size).shape == (self.size, *batch_shape, 3) + assert draw(mixture, draws=self.size).shape == (self.size, *batch_shape, 3) if aesara.config.floatX == "float32": rtol = 1e-4 @@ -530,18 +924,16 @@ def test_with_multinomial(self, batch_shape): rtol = 1e-7 initial_point = model.compute_initial_point() - comp_logp = comp_dists.logp(initial_point["mixture"].reshape(*batch_shape, 1, 3)) + comp_logp = logp(comp_dists, initial_point["mixture"].reshape(*batch_shape, 1, 3)) log_sum_exp = logsumexp( - comp_logp.eval() + np.log(w)[..., None], axis=mixture_axis, keepdims=True + comp_logp.eval() + np.log(w), axis=mixture_axis, keepdims=True ).sum() assert_allclose( - model.logp(initial_point), + model.compile_logp()(initial_point), log_sum_exp, rtol, ) - # TODO: Handle case when `batch_shape` == `sample_shape`. - # See https://github.com/pymc-devs/pymc/issues/4185 for details. def test_with_mvnormal(self): # 10 batch, 3-variate Gaussian mu = np.random.randn(self.mixture_comps, 3) @@ -550,15 +942,13 @@ def test_with_mvnormal(self): chol = np.linalg.cholesky(cov) w = np.ones(self.mixture_comps) / self.mixture_comps - with pm.Model() as model: - comp_dists = pm.MvNormal.dist(mu=mu, chol=chol, shape=(self.mixture_comps, 3)) - mixture = pm.MixtureSameFamily( - "mixture", w=w, comp_dists=comp_dists, mixture_axis=0, shape=(3,) - ) - prior = pm.sample_prior_predictive(samples=self.n_samples) + with Model() as model: + comp_dists = MvNormal.dist(mu=mu, chol=chol, shape=(self.mixture_comps, 3)) + mixture = Mixture("mixture", w=w, comp_dists=comp_dists, shape=(3,)) + prior = sample_prior_predictive(samples=self.n_samples, return_inferencedata=False) assert prior["mixture"].shape == (self.n_samples, 3) - assert mixture.random(size=self.size).shape == (self.size, 3) + assert draw(mixture, draws=self.size).shape == (self.size, 3) if aesara.config.floatX == "float32": rtol = 1e-4 @@ -566,23 +956,209 @@ def test_with_mvnormal(self): rtol = 1e-7 initial_point = model.compute_initial_point() - comp_logp = comp_dists.logp(initial_point["mixture"].reshape(1, 3)) - log_sum_exp = logsumexp( - comp_logp.eval() + np.log(w)[..., None], axis=0, keepdims=True - ).sum() + comp_logp = logp(comp_dists, initial_point["mixture"].reshape(1, 3)) + log_sum_exp = logsumexp(comp_logp.eval() + np.log(w), axis=0, keepdims=True).sum() assert_allclose( - model.logp(initial_point), + model.compile_logp()(initial_point), log_sum_exp, rtol, ) def test_broadcasting_in_shape(self): - with pm.Model() as model: - mu = pm.Gamma("mu", 1.0, 1.0, shape=2) - comp_dists = pm.Poisson.dist(mu, shape=2) - mix = pm.MixtureSameFamily( - "mix", w=np.ones(2) / 2, comp_dists=comp_dists, shape=(1000,) - ) - prior = pm.sample_prior_predictive(samples=self.n_samples) + with Model() as model: + mu = Gamma("mu", 1.0, 1.0, shape=2) + comp_dists = Poisson.dist(mu, shape=2) + mix = Mixture("mix", w=np.ones(2) / 2, comp_dists=comp_dists, shape=(1000,)) + prior = sample_prior_predictive(samples=self.n_samples, return_inferencedata=False) assert prior["mix"].shape == (self.n_samples, 1000) + + +class TestMixtureMoments: + @pytest.mark.parametrize( + "weights, comp_dists, size, expected", + [ + ( + np.array([0.4, 0.6]), + Normal.dist(mu=np.array([-2, 6]), sigma=np.array([5, 3])), + None, + 2.8, + ), + ( + np.tile(1 / 13, 13), + Normal.dist(-2, 1, size=(13,)), + (3,), + np.full((3,), -2), + ), + ( + np.array([0.4, 0.6]), + Normal.dist([-2, 6], 3), + (5, 3), + np.full((5, 3), 2.8), + ), + ( + np.broadcast_to(np.array([0.4, 0.6]), (5, 3, 2)), + Normal.dist(np.array([-2, 6]), np.array([5, 3])), + None, + np.full(shape=(5, 3), fill_value=2.8), + ), + ( + np.array([0.4, 0.6]), + Normal.dist(np.array([-2, 6]), np.array([5, 3]), size=(5, 3, 2)), + None, + np.full(shape=(5, 3), fill_value=2.8), + ), + ( + np.array([[0.8, 0.2], [0.2, 0.8]]), + Normal.dist(np.array([-2, 6])), + None, + np.array([-0.4, 4.4]), + ), + # implied size = (11, 7) will be overwritten by (5, 3) + ( + np.array([0.4, 0.6]), + Normal.dist(np.array([-2, 6]), np.array([5, 3]), size=(11, 7, 2)), + (5, 3), + np.full(shape=(5, 3), fill_value=2.8), + ), + ], + ) + def test_single_univariate_component(self, weights, comp_dists, size, expected): + with Model() as model: + Mixture("x", weights, comp_dists, size=size) + assert_moment_is_expected(model, expected, check_finite_logp=False) + + @pytest.mark.parametrize( + "weights, comp_dists, size, expected", + [ + ( + np.array([1, 0]), + [Normal.dist(-2, 5), Normal.dist(6, 3)], + None, + -2, + ), + ( + np.array([0.4, 0.6]), + [Normal.dist(-2, 5, size=(2,)), Normal.dist(6, 3, size=(2,))], + None, + np.full((2,), 2.8), + ), + ( + np.array([0.5, 0.5]), + [Normal.dist(-2, 5), Exponential.dist(lam=1 / 3)], + (3, 5), + np.full((3, 5), 0.5), + ), + ( + np.broadcast_to(np.array([0.4, 0.6]), (5, 3, 2)), + [Normal.dist(-2, 5), Normal.dist(6, 3)], + None, + np.full(shape=(5, 3), fill_value=2.8), + ), + ( + np.array([[0.8, 0.2], [0.2, 0.8]]), + [Normal.dist(-2, 5), Normal.dist(6, 3)], + None, + np.array([-0.4, 4.4]), + ), + ( + np.array([[0.8, 0.2], [0.2, 0.8]]), + [Normal.dist(-2, 5), Normal.dist(6, 3)], + (3, 2), + np.full((3, 2), np.array([-0.4, 4.4])), + ), + ( + # implied size = (11, 7) will be overwritten by (5, 3) + np.array([0.4, 0.6]), + [Normal.dist(-2, 5, size=(11, 7)), Normal.dist(6, 3, size=(11, 7))], + (5, 3), + np.full(shape=(5, 3), fill_value=2.8), + ), + ], + ) + def test_list_univariate_components(self, weights, comp_dists, size, expected): + with Model() as model: + Mixture("x", weights, comp_dists, size=size) + assert_moment_is_expected(model, expected, check_finite_logp=False) + + @pytest.mark.parametrize( + "weights, comp_dists, size, expected", + [ + ( + np.array([0.4, 0.6]), + MvNormal.dist(mu=np.array([[-1, -2], [3, 5]]), cov=np.eye(2) * 0.3), + None, + np.array([1.4, 2.2]), + ), + ( + np.array([0.5, 0.5]), + Dirichlet.dist(a=np.array([[0.0001, 0.0001, 1000], [2, 4, 6]])), + (4,), + np.array(np.full((4, 3), [1 / 12, 1 / 6, 3 / 4])), + ), + ( + np.array([0.4, 0.6]), + MvNormal.dist(mu=np.array([-10, 0, 10]), cov=np.eye(3) * 3, size=(4, 2)), + None, + np.full((4, 3), [-10, 0, 10]), + ), + ( + np.array([[1.0, 0], [0.0, 1.0]]), + MvNormal.dist( + mu=np.array([[-5, -10, -15], [5, 10, 15]]), cov=np.eye(3) * 3, size=(2,) + ), + (3, 2), + np.full((3, 2, 3), [[-5, -10, -15], [5, 10, 15]]), + ), + ], + ) + def test_single_multivariate_component(self, weights, comp_dists, size, expected): + with Model() as model: + Mixture("x", weights, comp_dists, size=size) + assert_moment_is_expected(model, expected, check_finite_logp=False) + + @pytest.mark.parametrize( + "weights, comp_dists, size, expected", + [ + ( + np.array([0.4, 0.6]), + [ + MvNormal.dist(mu=np.array([-1, -2]), cov=np.eye(2) * 0.3), + MvNormal.dist(mu=np.array([3, 5]), cov=np.eye(2) * 0.8), + ], + None, + np.array([1.4, 2.2]), + ), + ( + np.array([0.4, 0.6]), + [ + Dirichlet.dist(a=np.array([2, 3, 5])), + MvNormal.dist(mu=np.array([-10, 0, 10]), cov=np.eye(3) * 3), + ], + (4,), + np.array(np.full((4, 3), [-5.92, 0.12, 6.2])), + ), + ( + np.array([0.4, 0.6]), + [ + Dirichlet.dist(a=np.array([2, 3, 5]), size=(2,)), + MvNormal.dist(mu=np.array([-10, 0, 10]), cov=np.eye(3) * 3, size=(2,)), + ], + None, + np.full((2, 3), [-5.92, 0.12, 6.2]), + ), + ( + np.array([[1.0, 0], [0.0, 1.0]]), + [ + MvNormal.dist(mu=np.array([-5, -10, -15]), cov=np.eye(3) * 3, size=(2,)), + MvNormal.dist(mu=np.array([5, 10, 15]), cov=np.eye(3) * 3, size=(2,)), + ], + (3, 2), + np.full((3, 2, 3), [[-5, -10, -15], [5, 10, 15]]), + ), + ], + ) + def test_list_multivariate_components(self, weights, comp_dists, size, expected): + with Model() as model: + Mixture("x", weights, comp_dists, size=size) + assert_moment_is_expected(model, expected, check_finite_logp=False)