"""
Provides wrappers for several random number generators (RNGs), giving them all a
common interface so that they can be used interchangeably in PyNN.
Classes:
NumpyRNG - uses the np.random.RandomState RNG
GSLRNG - uses the RNGs from the Gnu Scientific Library
NativeRNG - indicates to the simulator that it should use it's own,
built-in RNG
RandomDistribution - produces random numbers from a specific distribution
:copyright: Copyright 2006-2023 by the PyNN team, see AUTHORS.
:license: CeCILL, see LICENSE for details.
"""
from copy import deepcopy
import logging
from functools import reduce
import operator
import time
import numpy as np
try:
import pygsl.rng
have_gsl = True
except (ImportError, Warning):
have_gsl = False
from lazyarray import partial_shape
logger = logging.getLogger("PyNN")
available_distributions = {
'binomial': ('n', 'p'),
'gamma': ('k', 'theta'),
'exponential': ('beta',),
'lognormal': ('mu', 'sigma'),
'normal': ('mu', 'sigma'),
'normal_clipped': ('mu', 'sigma', 'low', 'high'),
'normal_clipped_to_boundary':
('mu', 'sigma', 'low', 'high'),
'poisson': ('lambda_',),
'uniform': ('low', 'high'),
'uniform_int': ('low', 'high'),
'vonmises': ('mu', 'kappa'),
}
MAX_REDRAWS = 1000 # for clipped distributions
def get_mpi_config():
try:
from mpi4py import MPI
mpi_rank = MPI.COMM_WORLD.rank
num_processes = MPI.COMM_WORLD.size
except ImportError:
mpi_rank = 0
num_processes = 1
return mpi_rank, num_processes
class AbstractRNG(object):
"""Abstract class for wrapping random number generators. The idea is to be
able to use either simulator-native rngs, which may be more efficient, or a
standard Python rng, e.g. a np.random.RandomState object, which would
allow the same random numbers to be used across different simulators, or
simply to read externally-generated numbers from files."""
def __init__(self, seed=None):
if seed is not None:
assert isinstance(seed, int), "`seed` must be an int, not a %s" % type(seed).__name__
self.seed = seed
# define some aliases
self.random = self.next
self.sample = self.next
def __repr__(self):
return "%s(seed=%r)" % (self.__class__.__name__, self.seed)
def next(self, n=None, distribution=None, parameters=None, mask=None):
"""Return `n` random numbers from the specified distribution.
If:
* `n` is None, return a float,
* `n` >= 1, return a Numpy array,
* `n` < 0, raise an Exception,
* `n` is 0, return an empty array.
If called with distribution=None, returns uniformly distributed floats in the range [0, 1)
If `mask` is provided, it should be a boolean or integer NumPy array,
indicating that only a subset of the random numbers should be returned.
Example::
rng.next(5, mask=np.array([True, False, True, False, True]))
or::
rng.next(5, mask=np.array([0, 2, 4]))
will each return only three values.
If the rng is "parallel safe", an array of n values will be drawn from the rng,
and the mask applied.
If the rng is not parallel safe, the contents of the mask are disregarded, only its
size (for an integer mask) or the number of True values (for a boolean mask)
is used in determining how many values to draw.
"""
raise NotImplementedError
class WrappedRNG(AbstractRNG):
def __init__(self, seed=None, parallel_safe=True):
AbstractRNG.__init__(self, seed)
self.parallel_safe = parallel_safe
self.mpi_rank, self.num_processes = get_mpi_config()
if self.seed is not None and not parallel_safe:
self.seed += self.mpi_rank # ensure different nodes get different sequences
if self.mpi_rank != 0:
logger.warning("Changing the seed to %s on node %d" % (self.seed, self.mpi_rank))
def next(self, n=None, distribution=None, parameters=None, mask=None):
if distribution is None:
distribution = 'uniform'
if parameters is None:
parameters = {"low": 0.0, "high": 1.0}
if n == 0:
rarr = np.random.rand(0) # We return an empty array
elif n is None:
rarr = self._next(distribution, 1, parameters)
elif n > 0:
if mask is not None:
assert isinstance(mask, np.ndarray)
if mask.dtype == bool:
if mask.size != n:
raise ValueError("boolean mask size must equal n")
if not self.parallel_safe:
if mask.dtype == bool:
n = mask.sum()
elif mask.dtype == np.integer:
n = mask.size
rarr = self._next(distribution, n, parameters)
else:
raise ValueError("The sample number must be positive")
if not isinstance(rarr, np.ndarray):
rarr = np.array(rarr)
if self.parallel_safe and mask is not None:
# strip out the random numbers that should be used on other processors.
rarr = rarr[mask]
if n is None:
return rarr[0]
else:
return rarr
next.__doc__ = AbstractRNG.next.__doc__
def _clipped(self, gen, low=-np.inf, high=np.inf, size=None):
""" """
res = gen(size)
iterations = 0
err_msg = ("Maximum number of redraws exceeded. "
"Check the parameterization of your distribution.")
if size is None:
while res < low or res > high:
# limit the number of iterations.
# Possibility of infinite loop, depending on parameters
if iterations > MAX_REDRAWS:
raise Exception(err_msg)
res = gen(size)
iterations += 1
else:
idx = np.where((res > high) | (res < low))[0]
while idx.size > 0:
if iterations > MAX_REDRAWS:
raise Exception(err_msg)
redrawn = gen(idx.size)
res[idx] = redrawn
idx = idx[np.where((redrawn > high) | (redrawn < low))[0]]
iterations += 1
return res
def describe(self):
return "%s() with seed %s for MPI rank %d (MPI processes %d). %s parallel safe." % (
self.__class__.__name__, self.seed, self.mpi_rank, self.num_processes,
self.parallel_safe and "Is" or "Not")
[docs]class NumpyRNG(WrappedRNG):
"""Wrapper for the :class:`np.random.RandomState` class (Mersenne Twister PRNG)."""
translations = {
'binomial': ('binomial', {'n': 'n', 'p': 'p'}),
'gamma': ('gamma', {'k': 'shape', 'theta': 'scale'}),
'exponential': ('exponential', {'beta': 'scale'}),
'lognormal': ('lognormal', {'mu': 'mean', 'sigma': 'sigma'}),
'normal': ('normal', {'mu': 'loc', 'sigma': 'scale'}),
'normal_clipped': ('normal_clipped', {'mu': 'mu', 'sigma': 'sigma', 'low': 'low', 'high': 'high'}), # noqa:E501
'normal_clipped_to_boundary':
('normal_clipped_to_boundary', {'mu': 'mu', 'sigma': 'sigma', 'low': 'low', 'high': 'high'}), # noqa:E501
'poisson': ('poisson', {'lambda_': 'lam'}),
'uniform': ('uniform', {'low': 'low', 'high': 'high'}),
'uniform_int': ('randint', {'low': 'low', 'high': 'high'}),
'vonmises': ('vonmises', {'mu': 'mu', 'kappa': 'kappa'}),
}
def __init__(self, seed=None, parallel_safe=True):
WrappedRNG.__init__(self, seed, parallel_safe)
self.rng = np.random.RandomState()
if self.seed is not None:
self.rng.seed(self.seed)
else:
self.rng.seed()
def __getattr__(self, name):
"""
This is to give the PyNN RNGs the same methods as the wrapped RNGs
(:class:`np.random.RandomState` or the GSL RNGs.)
"""
return getattr(self.rng, name)
def _next(self, distribution, n, parameters):
# TODO: allow non-standardized distributions to pass through without translation
distribution_np, parameter_map = self.translations[distribution]
if set(parameters.keys()) != set(parameter_map.keys()):
# all parameters must be provided.
# We do not provide default values (this can be discussed).
err_msg = "Incorrect parameterization of random distribution. Expected %s, got %s."
raise KeyError(err_msg % (parameter_map.keys(), parameters.keys()))
parameters_np = dict((parameter_map[k], v) for k, v in parameters.items())
if hasattr(self, distribution_np):
f_distr = getattr(self, distribution_np)
else:
f_distr = getattr(self.rng, distribution_np)
return f_distr(size=n, **parameters_np)
def __deepcopy__(self, memo):
obj = NumpyRNG.__new__(NumpyRNG)
WrappedRNG.__init__(obj, seed=deepcopy(self.seed, memo),
parallel_safe=deepcopy(self.parallel_safe, memo))
obj.rng = deepcopy(self.rng)
return obj
[docs] def normal_clipped(self, mu=0.0, sigma=1.0, low=-np.inf, high=np.inf, size=None):
""" """
# not sure how well this works with parallel_safe, mask_local
def gen(n):
return self.rng.normal(loc=mu, scale=sigma, size=n)
return self._clipped(gen, low=low, high=high, size=size)
[docs] def normal_clipped_to_boundary(self, mu=0.0, sigma=1.0, low=-np.inf, high=np.inf, size=None):
# Not recommended, used `normal_clipped` instead.
# Provided because some models in the literature use this.
res = self.rng.normal(loc=mu, scale=sigma, size=size)
return np.maximum(np.minimum(res, high), low)
[docs]class GSLRNG(WrappedRNG):
"""Wrapper for the GSL random number generators."""
translations = {
'binomial': ('binomial', {'n': 'n', 'p': 'p'}),
'gamma': ('gamma', {'k': 'k', 'theta': 'theta'}),
'exponential': ('exponential', {'beta': 'mu'}),
'lognormal': ('lognormal', {'mu': 'zeta', 'sigma': 'sigma'}),
'normal': ('normal', {'mu': 'mu', 'sigma': 'sigma'}),
'normal_clipped': ('normal_clipped', {'mu': 'mu', 'sigma': 'sigma', 'low': 'low', 'high': 'high'}), # noqa: E501
'poisson': ('poisson', {'lambda_': 'mu'}),
'uniform': ('flat', {'low': 'a', 'high': 'b'}),
'uniform_int': ('uniform_int', {'low': 'low', 'high': 'high'}),
}
def __init__(self, seed=None, type='mt19937', parallel_safe=True):
if not have_gsl:
raise ImportError("GSLRNG: Cannot import pygsl")
WrappedRNG.__init__(self, seed, parallel_safe)
self.rng = getattr(pygsl.rng, type)()
if self.seed is not None:
self.rng.set(self.seed)
else:
self.seed = int(time.time())
self.rng.set(self.seed)
def __getattr__(self, name):
"""This is to give GSLRNG the same methods as the GSL RNGs."""
return getattr(self.rng, name)
def _next(self, distribution, n, parameters):
distribution_gsl, parameter_map = self.translations[distribution]
if set(parameters.keys()) != set(parameter_map.keys()):
# all parameters must be provided.
# We do not provide default values (this can be discussed).
err_msg = "Incorrect parameterization of random distribution. Expected %s, got %s."
raise KeyError(err_msg % (parameter_map.keys(), parameters.keys()))
parameters_gsl = dict((parameter_map[k], v) for k, v in parameters.items())
if hasattr(self, distribution_gsl):
f_distr = getattr(self, distribution_gsl)
else:
f_distr = getattr(self.rng, distribution_gsl)
# Has this been tested? If so, move most of _next to Wrapped RNG since there is almost
# complete overlap with NumpyRNG._next
values = f_distr(size=n, **parameters_gsl)
if n == 1:
values = [values] # to be consistent with NumpyRNG
return values
[docs] def gamma(self, k, theta, size=None):
""" """
return self.rng.gamma(k, 1 / theta, size)
[docs] def normal(self, mu=0.0, sigma=1.0, size=None):
""" """
return mu + self.rng.gaussian(sigma, size)
[docs] def normal_clipped(self, mu=0.0, sigma=1.0, low=-np.inf, high=np.inf, size=None):
""" """
def gen(n):
return self.normal(mu, sigma, n)
return self._clipped(gen, low=low, high=high, size=size)
# should add a wrapper for the built-in Python random module.
[docs]class NativeRNG(AbstractRNG):
"""
Signals that the simulator's own native RNG should be used.
Each simulator module should implement a class of the same name which
inherits from this and which sets the seed appropriately.
"""
def __str__(self):
return "NativeRNG(seed=%s)" % self.seed
[docs]class RandomDistribution(object):
"""
Class which defines a next(n) method which returns an array of `n` random
numbers from a given distribution.
Arguments:
`distribution`:
the name of a random number distribution.
`parameters_pos`:
parameters of the distribution, provided as a tuple. For the correct
ordering, see `random.available_distributions`.
`rng`:
if present, should be a :class:`NumpyRNG`, :class:`GSLRNG` or
:class:`NativeRNG` object.
`parameters_named`:
parameters of the distribution, provided as keyword arguments.
Parameters may be provided either through `parameters_pos` or through
`parameters_named`, but not both. All parameters must be provided, there
are no default values. Parameter names are, in general, as used in Wikipedia.
Examples:
>>> rd = RandomDistribution('uniform', (-70, -50))
>>> rd = RandomDistribution('normal', mu=0.5, sigma=0.1)
>>> rng = NumpyRNG(seed=8658764)
>>> rd = RandomDistribution('gamma', k=2.0, theta=5.0, rng=rng)
Available distributions:
========================== ==================== ===========================================
Name Parameters Comments
-------------------------- -------------------- -------------------------------------------
binomial n, p
gamma k, theta
exponential beta
lognormal mu, sigma
normal mu, sigma
normal_clipped mu, sigma, low, high Values outside (low, high) are redrawn
normal_clipped_to_boundary mu, sigma, low, high Values outside low/high are set to low/high
poisson lambda_ Underscore since lambda is a Python keyword
uniform low, high
uniform_int low, high
vonmises mu, kappa
========================== ==================== ===========================================
"""
def __init__(self, distribution, parameters_pos=None, rng=None, **parameters_named):
"""
Create a new RandomDistribution.
"""
self.name = distribution
self.parameters = self._resolve_parameters(parameters_pos, parameters_named)
if rng:
assert isinstance(rng, AbstractRNG), "rng must be a pyNN.random RNG object"
self.rng = rng
else: # use np.random.RandomState() by default
self.rng = NumpyRNG() # should we provide a seed?
[docs] def next(self, n=None, mask=None):
"""Return `n` random numbers from the distribution."""
res = self.rng.next(n=n,
distribution=self.name,
parameters=self.parameters,
mask=mask)
return res
def __str__(self):
return "RandomDistribution('%(name)s', %(parameters)s, %(rng)s)" % self.__dict__
def _resolve_parameters(self, positional, named):
if positional is None:
if set(named.keys()) != set(available_distributions[self.name]):
err_msg = "Incorrect parameterization of random distribution. Expected %s, got %s."
raise KeyError(err_msg % (available_distributions[self.name], tuple(named.keys())))
return named
elif len(named) == 0:
if isinstance(positional, dict):
raise TypeError("Positional parameters should be a tuple, not a dict")
expected_parameter_names = available_distributions[self.name]
if len(positional) != len(expected_parameter_names):
err_msg = ("Incorrect number of parameters for random distribution. "
f"For {expected_parameter_names} received {positional}")
raise ValueError(err_msg)
else:
return dict((name, value)
for name, value in zip(expected_parameter_names, positional))
else:
raise ValueError("Mixed positional and named parameters")
[docs] def lazily_evaluate(self, mask=None, shape=None):
"""
Generate an array of random numbers of the requested shape.
If a mask is given, produce only enough numbers to fill the
region defined by the mask (hence 'lazily').
This method is called by the lazyarray `evaluate()` and
`_partially_evaluate()` methods.
"""
if mask is None:
# produce an array of random numbers with the requested shape
n = reduce(operator.mul, shape)
res = self.next(n)
if res.shape != shape:
res = res.reshape(shape)
if n == 1:
res = res[0]
else:
# produce an array of random numbers whose shape is
# that of a sub-array produced by applying the mask
# to an array of the requested global shape
p_shape = partial_shape(mask, shape)
if p_shape:
n = reduce(operator.mul, p_shape)
else:
n = 1
res = self.next(n).reshape(p_shape)
return res