Source code for pyNN.parameters

"""
Parameter set handling

:copyright: Copyright 2006-2024 by the PyNN team, see AUTHORS.
:license: CeCILL, see LICENSE for details.
"""

from collections.abc import Sized
import numpy as np
from lazyarray import larray, partial_shape
from .core import is_listlike
from . import errors
from .random import RandomDistribution, NativeRNG


[docs] class LazyArray(larray): """ Optimises storage of arrays in various ways: - stores only a single value if all the values in the array are the same - if the array is created from a :class:`~pyNN.random.RandomDistribution` or a function `f(i,j)`, then elements are only evaluated when they are accessed. Any operations performed on the array are also queued up to be executed on access. The main intention of the latter is to save memory for very large arrays by accessing them one row or column at a time: the entire array need never be in memory. Arguments: `value`: may be an int, float, bool, NumPy array, iterator, generator or a function, `f(i)` or `f(i,j)`, depending on the dimensions of the array. `f(i,j)` should return a single number when `i` and `j` are integers, and a 1D array when either `i` or `j` or both is a NumPy array (in the latter case the two arrays must have equal lengths). `shape`: a tuple giving the shape of the array, or `None` `dtype`: the NumPy `dtype`. """ # most of the implementation moved to external lazyarray package # the plan is ultimately to move everything to lazyarray def __init__(self, value, shape=None, dtype=None): if isinstance(value, str): err_msg = "Value should be a string expressing a function of d. " try: value = eval("lambda d: %s" % value) except SyntaxError: raise errors.InvalidParameterValueError(err_msg + "Incorrect syntax.") try: value(0.0) except NameError as err: raise errors.InvalidParameterValueError(err_msg + str(err)) super(LazyArray, self).__init__(value, shape, dtype) def __setitem__(self, addr, new_value): self.check_bounds(addr) if ( self.is_homogeneous and isinstance(new_value, (int, float, bool)) and self.evaluate(simplify=True) == new_value ): pass else: self.base_value = self.evaluate() self.base_value[addr] = new_value self.operations = []
[docs] def by_column(self, mask=None): """ Iterate over the columns of the array. Columns will be yielded either as a 1D array or as a single value (for a flat array). `mask`: either `None` or a boolean array indicating which columns should be included. """ column_indices = np.arange(self.ncols) if mask is not None: if not isinstance(mask, slice): assert len(mask) == self.ncols column_indices = column_indices[mask] if isinstance(self.base_value, RandomDistribution) and self.base_value.rng.parallel_safe: if mask is None: for j in column_indices: yield self._partially_evaluate((slice(None), j), simplify=True) else: column_indices = np.arange(self.ncols) for j, local in zip(column_indices, mask): col = self._partially_evaluate((slice(None), j), simplify=True) if local: yield col else: for j in column_indices: yield self._partially_evaluate((slice(None), j), simplify=True)
def _apply_operations(self, x, addr=None, simplify=False): # todo: move this modified version back into lazyarray for f, arg in self.operations: if arg is None: x = f(x) elif isinstance(arg, larray): if addr is None: x = f(x, arg.evaluate(simplify=simplify)) else: x = f(x, arg._partially_evaluate(addr, simplify=simplify)) else: try: x = f(x, arg) except TypeError: assert isinstance(x, np.ndarray) if x.dtype == np.dtype('O'): x = np.array([f(xi, arg) for xi in x]) else: raise return x
class ArrayParameter(object): """ Represents a parameter whose value consists of multiple values, e.g. a tuple or array. The reason for defining this class rather than just using a NumPy array is to avoid the ambiguity of "is a given array a single parameter value (e.g. a spike train for one cell) or an array of parameter values (e.g. one number per cell)?". Arguments: `value`: anything which can be converted to a NumPy array, or another :class:`ArrayParameter` object. """ def __init__(self, value): if isinstance(value, ArrayParameter): self.value = value.value elif isinstance(value, np.ndarray): # dont overwrite dtype of int arrays self.value = value else: self.value = np.array(value, float) # def __len__(self): # This must not be defined, otherwise ArrayParameter is insufficiently different # from NumPy array def max(self): """Return the maximum value.""" return self.value.max() def __getitem__(self, item): return self.value[item] def __add__(self, val): """ Return a new :class:`ArrayParameter` in which all values in the original :class:`ArrayParameter` have `val` added to them. If `val` is itself an array, return an array of :class:`ArrayParameter` objects, where ArrayParameter `i` is the original ArrayParameter added to element `i` of val. """ if hasattr(val, '__len__'): # reshape if necessary? return np.array([self.__class__(self.value + x) for x in val], dtype=self.__class__) else: return self.__class__(self.value + val) def __sub__(self, val): """ Return a new :class:`ArrayParameter` in which all values in the original :class:`ArrayParameter` have `val` subtracted from them. If `val` is itself an array, return an array of :class:`ArrayParameter` objects, where ArrayParameter `i` is the original ArrayParameter with element `i` of val subtracted from it. """ if hasattr(val, '__len__'): # reshape if necessary? return np.array([self.__class__(self.value - x) for x in val], dtype=self.__class__) else: return self.__class__(self.value - val) def __mul__(self, val): """ Return a new :class:`ArrayParameter` in which all values in the original :class:`ArrayParameter` have been multiplied by `val`. If `val` is itself an array, return an array of :class:`ArrayParameter` objects, where ArrayParameter `i` is the original ArrayParameter multiplied by element `i` of `val`. """ if hasattr(val, '__len__') and not (hasattr(val, "shape") and len(val.shape) == 0): # the second condition weeds out 0-dimensional arrays, like Brian units. # reshape if necessary? return np.array([self.__class__(self.value * x) for x in val], dtype=self.__class__) else: return self.__class__(self.value * val) __rmul__ = __mul__ def __div__(self, val): """ Return a new :class:`ArrayParameter` in which all values in the original :class:`ArrayParameter` have been divided by `val`. If `val` is itself an array, return an array of :class:`ArrayParameter` objects, where ArrayParameter `i` is the original ArrayParameter divided by element `i` of `val`. """ if hasattr(val, '__len__'): # reshape if necessary? return np.array([self.__class__(self.value / x) for x in val], dtype=self.__class__) else: return self.__class__(self.value / val) __truediv__ = __div__ def __eq__(self, other): if isinstance(other, ArrayParameter): return self.value.size == other.value.size and (self.value == other.value).all() elif ( isinstance(other, np.ndarray) and other.size > 0 and isinstance(other[0], ArrayParameter) ): return np.array([(self == seq).all() for seq in other]) else: return False def __repr__(self): return "%s(%s)" % (self.__class__.__name__, self.value)
[docs] class Sequence(ArrayParameter): """ Represents a sequence of numerical values. Arguments: `value`: anything which can be converted to a NumPy array, or another :class:`Sequence` object. """ # should perhaps use neo.SpikeTrain instead of this class, or at least allow a neo SpikeTrain pass
[docs] class ParameterSpace(object): """ Representation of one or more points in a parameter space. i.e. represents one or more parameter sets, where each parameter set has the same parameter names and types but the parameters may have different values. Arguments: `parameters`: a dict containing values of any type that may be used to construct a `lazy array`_, i.e. `int`, `float`, NumPy array, :class:`~pyNN.random.RandomDistribution`, function that accepts a single argument. `schema`: a dict whose keys are the expected parameter names and whose values are the expected parameter types `component`: optional - class for which the parameters are destined. Used in error messages. `shape`: the shape of the lazy arrays that will be constructed. .. _`lazy array`: https://lazyarray.readthedocs.org/ """ def __init__(self, parameters, schema=None, shape=None, component=None): """ """ self._parameters = {} self.schema = schema self._shape = shape self.component = component self.update(**parameters) self._evaluated = False def _set_shape(self, shape): for value in self._parameters.values(): value.shape = shape self._shape = shape shape = property(fget=lambda self: self._shape, fset=_set_shape, doc="Size of the lazy arrays contained within the parameter space")
[docs] def keys(self): """ PS.keys() -> list of PS's keys. """ return self._parameters.keys()
[docs] def items(self): """ PS.items() -> an iterator over the (key, value) items of PS. Note that the values will all be :class:`LazyArray` objects. """ return self._parameters.items()
def __repr__(self): return "<ParameterSpace %s, shape=%s>" % (", ".join(self.keys()), self.shape)
[docs] def update(self, **parameters): """ Update the contents of the parameter space according to the `(key, value)` pairs in ``**parameters``. All values will be turned into lazy arrays. If the :class:`ParameterSpace` has a schema, the keys and the data types of the values will be checked against the schema. """ for name, value in parameters.items(): self.__setitem__(name, value)
[docs] def __getitem__(self, name): """x.__getitem__(y) <==> x[y]""" return self._parameters[name]
def __setitem__(self, name, value): if self.schema and name in self.schema: if isinstance(self.schema[name], dict): self._setitem_subspace(name, value) else: self._setitem_value(name, value) else: if isinstance(value, (dict, ParameterSpace)): self._setitem_subspace(name, value) else: self._setitem_value(name, value) def _setitem_value(self, name, value): if self.schema: try: expected_dtype = self.schema[name] except KeyError: if self.component: model_name = self.component.__name__ else: model_name = 'unknown' raise errors.NonExistentParameterError( name, model_name, valid_parameter_names=self.schema.keys()) if issubclass(expected_dtype, ArrayParameter) and isinstance(value, Sized): if len(value) == 0: value = ArrayParameter([]) elif not isinstance(value[0], ArrayParameter): # may be a more generic way to do it, but for now this special-casing # seems like the most robust approach if isinstance(value[0], Sized): # e.g. list of tuples value = type(value)([ArrayParameter(x) for x in value]) else: value = ArrayParameter(value) try: self._parameters[name] = LazyArray(value, shape=self._shape, dtype=expected_dtype) except (TypeError, errors.InvalidParameterValueError): raise errors.InvalidParameterValueError( f"For parameter {name} expected {expected_dtype}, got {type(value)}") except ValueError as err: # maybe put the more specific error classes into lazyarray raise errors.InvalidDimensionsError(err) else: self._parameters[name] = LazyArray(value, shape=self._shape) def _setitem_subspace(self, name, value): if isinstance(value, dict): self._parameters[name] = ParameterSpace( value, schema=self.schema.get(name, None) if self.schema else None, shape=self.shape ) else: # todo: check and/or pass on schema to child self._parameters[name] = value
[docs] def pop(self, name, d=None): """ Remove the given parameter from the parameter set and from its schema, and return its value. """ value = self._parameters.pop(name, d) if self.schema: self.schema.pop(name, d) return value
@property def is_homogeneous(self): """ True if all of the lazy arrays within are homogeneous. """ return all(value.is_homogeneous for value in self._parameters.values())
[docs] def evaluate(self, mask=None, simplify=False): """ Evaluate all lazy arrays contained in the parameter space, using the given mask. """ if self._shape is None: raise Exception("Must set shape of parameter space before evaluating") if mask is None: for name, value in self._parameters.items(): if isinstance(value, ParameterSpace): self._parameters[name].evaluate(mask, simplify) else: self._parameters[name] = value.evaluate(simplify=simplify) self._evaluated_shape = self._shape else: for name, value in self._parameters.items(): if isinstance(value, ParameterSpace): self._parameters[name].evaluate(mask, simplify) else: try: if ( isinstance(value.base_value, RandomDistribution) and value.base_value.rng.parallel_safe ): value = value.evaluate()[mask] # can't partially evaluate if using parallel safe else: value = value._partially_evaluate(mask, simplify=simplify) self._parameters[name] = value except ValueError: raise errors.InvalidParameterValueError( f"{name} should not be of type {type(value)}") self._evaluated_shape = partial_shape(mask, self._shape) self._evaluated = True # should possibly update self.shape according to mask? return self
[docs] def as_dict(self): """ Return a plain dict containing the same keys and values as the parameter space. The values must first have been evaluated. """ if not self._evaluated: raise Exception("Must call evaluate() method before calling ParameterSpace.as_dict()") D = {} for name, value in self._parameters.items(): if isinstance(value, ParameterSpace): D[name] = value.as_dict() else: D[name] = value assert not isinstance(D[name], LazyArray) # should all have been evaluated by now return D
[docs] def __iter__(self): r""" Return an array-element-wise iterator over the parameter space. Each item in the iterator is a dict, containing the same keys as the :class:`ParameterSpace`. For the `i`\th dict returned by the iterator, each value is the `i`\th element of the corresponding lazy array in the parameter space. Example: >>> ps = ParameterSpace({'a': [2, 3, 5, 8], 'b': 7, 'c': lambda i: 3*i+2}, shape=(4,)) >>> ps.evaluate() >>> for D in ps: ... print(D) ... {'a': 2, 'c': 2, 'b': 7} {'a': 3, 'c': 5, 'b': 7} {'a': 5, 'c': 8, 'b': 7} {'a': 8, 'c': 11, 'b': 7} """ if not self._evaluated: raise Exception("Must call evaluate() method before iterating over a ParameterSpace") for i in range(self._evaluated_shape[0]): D = {} for name, value in self._parameters.items(): if isinstance(value, ParameterSpace): D[name] = {} for cname, cvalue in value.items(): if is_listlike(cvalue): D[name][cname] = cvalue[i] else: D[name][cname] = cvalue else: if is_listlike(value): D[name] = value[i] else: D[name] = value assert not isinstance(D[name], LazyArray) # should all have been evaluated by now yield D
[docs] def columns(self): """ For a 2D space, return a column-wise iterator over the parameter space. """ if not self._evaluated: raise Exception("Must call evaluate() method before iterating over a ParameterSpace") assert len(self.shape) == 2 if len(self._evaluated_shape) == 1: # values will be one-dimensional yield self._parameters else: for j in range(self._evaluated_shape[1]): D = {} for name, value in self._parameters.items(): if is_listlike(value): D[name] = value[:, j] else: D[name] = value # should all have been evaluated by now assert not isinstance(D[name], LazyArray) yield D
def __eq__(self, other): return (all(a == b for a, b in zip(self._parameters.items(), other._parameters.items())) and self.schema == other.schema and self._shape == other._shape) @property def parallel_safe(self): return any( isinstance(value.base_value, RandomDistribution) and value.base_value.rng.parallel_safe for value in self._parameters.values() ) @property def has_native_rngs(self): """ Return True if the parameter set contains any NativeRNGs """ return any(isinstance(rd.base_value.rng, NativeRNG) for rd in self._random_distributions()) def _random_distributions(self): """ An iterator over those values contained in the PS that are derived from random distributions. """ return (value for value in self._parameters.values() if isinstance(value.base_value, RandomDistribution))
[docs] def expand(self, new_shape, mask): """ Increase the size of the ParameterSpace. Existing array values are mapped to the indices given in mask. New array values are set to NaN. """ for name, value in self._parameters.items(): if isinstance(value, ParameterSpace): value.expand(new_shape, mask) elif isinstance(value.base_value, np.ndarray): new_base_value = np.ones(new_shape) * np.nan new_base_value[mask] = value.base_value self._parameters[name].base_value = new_base_value self.shape = new_shape
[docs] def flatten(self, with_prefix=True): for name, value in self._parameters.copy().items(): if isinstance(value, ParameterSpace): for cname, cvalue in value.items(): if with_prefix: self._parameters["{}.{}".format(name, cname)] = cvalue else: self._parameters[cname] = cvalue
def simplify(value): """ If `value` is a homogeneous array, return the single value that all elements share. Otherwise, pass the value through. """ if isinstance(value, np.ndarray) and len(value.shape) > 0: # latter condition is for Brian scalar quantities if (value == value[0]).all(): return value[0] else: return value else: return value # alternative - need to benchmark # if np.any(arr != arr[0]): # return arr # else: # return arr[0] class IonicSpecies(object): """Encapsulation of parameters for an ionic species""" def __init__(self, ion_name, reversal_potential, internal_concentration=None, external_concentration=None): self.ion_name = ion_name self.reversal_potential = reversal_potential self.internal_concentration = internal_concentration self.external_concentration = external_concentration