Source code for pyNN.common.populations

# encoding: utf-8
"""
Common implementation of ID, Population, PopulationView and Assembly classes.

These base classes should be sub-classed by the backend-specific classes.

:copyright: Copyright 2006-2024 by the PyNN team, see AUTHORS.
:license: CeCILL, see LICENSE for details.
"""

import logging
import operator
import warnings
from itertools import chain
from functools import reduce
from collections import defaultdict
import numpy as np
from .. import random, recording, errors, standardmodels, core, space, descriptions
from ..models import BaseCellType
from ..parameters import ParameterSpace, LazyArray, simplify as simplify_parameter_array
from ..recording import files


deprecated = core.deprecated
logger = logging.getLogger("PyNN")


def is_conductance(target_cell):
    """
    Returns True if the target cell uses conductance-based synapses, False if
    it uses current-based synapses, and None if the synapse-basis cannot be
    determined.
    """
    if hasattr(target_cell, 'local') and target_cell.local and hasattr(target_cell, 'celltype'):
        is_conductance = target_cell.celltype.conductance_based
    else:
        is_conductance = None
    return is_conductance


class IDMixin(object):
    """
    Instead of storing ids as integers, we store them as ID objects,
    which allows a syntax like:
        p[3,4].tau_m = 20.0
    where p is a Population object.
    """
    # Simulator ID classes should inherit both from the base type of the ID
    # (e.g., int or long) and from IDMixin.

    def __getattr__(self, name):
        if name == "parent":
            raise Exception("parent is not set")
        elif name == "set":
            err_msg = "For individual cells, set values using the parameter name directly, " \
                     "e.g. population[0].tau_m = 20.0, or use 'set' on a population view, " \
                     "e.g. population[0:1].set(tau_m=20.0)"
            raise AttributeError(err_msg)
        try:
            val = self.get_parameters()[name]
        except KeyError:
            raise errors.NonExistentParameterError(name,
                                                   self.celltype.__class__.__name__,
                                                   self.celltype.get_parameter_names())
        return val

    def __setattr__(self, name, value):
        if name == "parent":
            object.__setattr__(self, name, value)
        elif self.celltype.has_parameter(name):
            self.set_parameters(**{name: value})
        else:
            object.__setattr__(self, name, value)

    def set_parameters(self, **parameters):
        """
        Set cell parameters, given as a sequence of parameter=value arguments.
        """
        # if some of the parameters are computed from the values of other
        # parameters, need to get and translate all parameters
        if self.local:
            self.as_view().set(**parameters)
        else:
            raise errors.NotLocalError(
                "Cannot set parameters for a cell that does not exist on this node.")

    def get_parameters(self):
        """Return a dict of all cell parameters."""
        if self.local:
            parameter_names = self.celltype.get_parameter_names()
            return dict((k, v)
                        for k, v in zip(parameter_names, self.as_view().get(parameter_names)))
        else:
            raise errors.NotLocalError(
                "Cannot obtain parameters for a cell that does not exist on this node.")

    @property
    def celltype(self):
        return self.parent.celltype

    @property
    def is_standard_cell(self):
        return isinstance(self.celltype, standardmodels.StandardCellType)

    def _set_position(self, pos):
        """
        Set the cell position in 3D space.

        Cell positions are stored in an array in the parent Population.
        """
        assert isinstance(pos, (tuple, np.ndarray))
        assert len(pos) == 3
        self.parent._set_cell_position(self, pos)

    def _get_position(self):
        """
        Return the cell position in 3D space.

        Cell positions are stored in an array in the parent Population, if any,
        or within the ID object otherwise. Positions are generated the first
        time they are requested and then cached.
        """
        return self.parent._get_cell_position(self)

    position = property(_get_position, _set_position)

    @property
    def local(self):
        return self.parent.is_local(self)

    def inject(self, current_source):
        """Inject current from a current source object into the cell."""
        current_source.inject_into([self])

    def get_initial_value(self, variable):
        """Get the initial value of a state variable of the cell."""
        return self.parent._get_cell_initial_value(self, variable)

    def set_initial_value(self, variable, value):
        """Set the initial value of a state variable of the cell."""
        self.parent._set_cell_initial_value(self, variable, value)

    def as_view(self):
        """Return a PopulationView containing just this cell."""
        index = self.parent.id_to_index(self)
        return self.parent[index:index + 1]


class BasePopulation(object):
    _record_filter = None

    def __getitem__(self, index):
        """
        Return either a single cell (ID object) from the Population, if `index`
        is an integer, or a subset of the cells (PopulationView object), if
        `index` is a slice or array.

        Note that __getitem__ is called when using [] access, e.g.
            p = Population(...)
            p[2] is equivalent to p.__getitem__(2).
            p[3:6] is equivalent to p.__getitem__(slice(3, 6))
        """
        if isinstance(index, (int, np.integer)):
            return self.all_cells[index]
        elif isinstance(index, (slice, list, np.ndarray)):
            return self._get_view(index)
        elif isinstance(index, tuple):
            return self._get_view(list(index))
        else:
            index_type = type(index).__name__
            raise TypeError(
                f"indices must be integers, slices, lists, arrays or tuples, not {index_type}")

    def __len__(self):
        """Return the total number of cells in the population (all nodes)."""
        return self.size

    @property
    def local_size(self):
        """Return the number of cells in the population on the local MPI node"""
        return len(self.local_cells)  # would self._mask_local.sum() be faster?

    def __iter__(self):
        """Iterator over cell ids on the local node."""
        return iter(self.local_cells)

    @property
    def conductance_based(self):
        """
        Indicates whether the post-synaptic response is modelled as a change
        in conductance or a change in current.
        """
        return self.celltype.conductance_based

    @property
    def receptor_types(self):
        return self.celltype.receptor_types

    def is_local(self, id):
        """
        Indicates whether the cell with the given ID exists on the local MPI node.
        """
        assert id.parent is self
        index = self.id_to_index(id)
        return self._mask_local[index]

    def all(self):
        """Iterator over cell ids on all MPI nodes."""
        return iter(self.all_cells)

    def __add__(self, other):
        """
        A Population/PopulationView can be added to another Population,
        PopulationView or Assembly, returning an Assembly.
        """
        assert isinstance(other, BasePopulation)
        return self._assembly_class(self, other)

    def _get_cell_position(self, id):
        index = self.id_to_index(id)
        return self.positions[:, index]

    def _set_cell_position(self, id, pos):
        index = self.id_to_index(id)
        self.positions[:, index] = pos

    @property
    def position_generator(self):  # "generator" is a misleading name, has no yield statement
        def gen(i):
            return self.positions.T[i]
        return gen

    def _get_cell_initial_value(self, id, variable):
        if variable in self.initial_values:
            assert isinstance(self.initial_values[variable], LazyArray)
            index = self.id_to_local_index(id)
            return self.initial_values[variable][index]
        else:
            logger.warning(
                "Variable '{}' is not in initial values, returning 0.0".format(variable))
            return 0.0

    def _set_cell_initial_value(self, id, variable, value):
        assert isinstance(self.initial_values[variable], LazyArray)
        index = self.id_to_local_index(id)
        self.initial_values[variable][index] = value

    def nearest(self, position):
        """Return the neuron closest to the specified position."""
        # doesn't always work correctly if a position is equidistant between
        # two neurons, i.e. 0.5 should be rounded up, but it isn't always.
        # also doesn't take account of periodic boundary conditions
        pos = np.array([position] * self.positions.shape[1]).transpose()
        dist_arr = (self.positions - pos)**2
        distances = dist_arr.sum(axis=0)
        nearest = distances.argmin()
        return self[nearest]

    def sample(self, n, rng=None):
        """
        Randomly sample `n` cells from the Population, and return a
        PopulationView object.
        """
        assert isinstance(n, int)
        if not rng:
            rng = random.NumpyRNG()
        indices = rng.permutation(np.arange(len(self), dtype=int))[0:n]
        logger.debug("The %d cells selected have indices %s" % (n, indices))
        logger.debug("%s.sample(%s)", self.label, n)
        return self._get_view(indices)

    def get(self, parameter_names, gather=False, simplify=True):
        """
        Get the values of the given parameters for every local cell in the
        population, or, if gather=True, for all cells in the population.

        Values will be expressed in the standard PyNN units (i.e. millivolts,
        nanoamps, milliseconds, microsiemens, nanofarads, event per second).
        """
        # if all the cells have the same value for a parameter, should
        # we return just the number, rather than an array?
        if isinstance(parameter_names, str):
            parameter_names = (parameter_names,)
            return_list = False
        else:
            return_list = True
        parameter_space = self._get_parameters(*parameter_names)
        parameter_space.shape = (self.local_size,)

        # re: simplify - what if parameter space is homogeneous on some nodes but not on others?
        # this also causes problems if the population size matches the number of MPI nodes
        parameter_space.evaluate(simplify=simplify)
        parameter_space.flatten()
        parameters = parameter_space.as_dict()

        if gather is True and self._simulator.state.num_processes > 1:
            # seems inefficient to do it in a loop - should do as single operation
            for name in parameter_names:
                values = parameters[name]
                if isinstance(values, np.ndarray):
                    all_values = {self._simulator.state.mpi_rank: values.tolist()}
                    local_indices = np.arange(self.size,)[self._mask_local].tolist()
                    all_indices = {self._simulator.state.mpi_rank: local_indices}
                    all_values = recording.gather_dict(all_values)
                    all_indices = recording.gather_dict(all_indices)
                    if self._simulator.state.mpi_rank == 0:
                        values = reduce(operator.add, all_values.values())
                        indices = reduce(operator.add, all_indices.values())
                        idx = np.argsort(indices)
                        values = np.array(values)[idx]
                parameters[name] = values
        try:
            values = [parameters[name] for name in parameter_names]
        except KeyError as err:
            raise errors.NonExistentParameterError(
                err.args[0], self.celltype, self.celltype.get_parameter_names())
        if return_list:
            return values
        else:
            assert len(parameter_names) == 1
            return values[0]

    def set(self, **parameters):
        """
        Set one or more parameters for every cell in the population.

        Values passed to set() may be:
            (1) single values
            (2) RandomDistribution objects
            (3) lists/arrays of values of the same size as the population
            (4) mapping functions, where a mapping function accepts a single
                argument (the cell index) and returns a single value.

        Here, a "single value" may be either a single number or a list/array of
        numbers (e.g. for spike times). Values should be expressed in the
        standard PyNN units (i.e. millivolts, nanoamps, milliseconds,
        microsiemens, nanofarads, event per second).

        Examples::

            p.set(tau_m=20.0, v_rest=-65).
            p.set(spike_times=[0.3, 0.7, 0.9, 1.4])
            p.set(cm=rand_distr, tau_m=lambda i: 10 + i/10.0)
        """
        # TODO: add example using of function of (x,y,z) and Population.position_generator
        if self.local_size > 0:
            if (isinstance(self.celltype, standardmodels.StandardCellType)
                    and self.celltype.computed_parameters_include(parameters)
                    and not isinstance(self.celltype, standardmodels.cells.SpikeSourceArray)):
                # the last condition above is a bit of hack to avoid calling expand() unecessarily
                # need to get existing parameter space of models so we can perform calculations
                native_names = self.celltype.get_native_names()
                parameter_space = self.celltype.reverse_translate(
                    self._get_native_parameters(*native_names))
                if self.local_size != self.size:
                    parameter_space.expand((self.size,), self._mask_local)
                parameter_space.update(**parameters)
            else:
                parameter_space = ParameterSpace(parameters,
                                                 self.celltype.get_schema(),
                                                 (self.size,),
                                                 self.celltype.__class__)
            if isinstance(self.celltype, standardmodels.StandardCellType):
                parameter_space = self.celltype.translate(parameter_space)
            assert parameter_space.shape == (self.size,), "{} != {}".format(
                parameter_space.shape, self.size)
            self._set_parameters(parameter_space)

    @deprecated("set(parametername=value_array)")
    def tset(self, parametername, value_array):
        """
        'Topographic' set. Set the value of parametername to the values in
        value_array, which must have the same dimensions as the Population.
        """
        self.set(**{parametername: value_array})

    @deprecated("set(parametername=rand_distr)")
    def rset(self, parametername, rand_distr):
        """
        'Random' set. Set the value of parametername to a value taken from
        rand_distr, which should be a RandomDistribution object.
        """
        # Note that we generate enough random numbers for all cells on all nodes
        # but use only those relevant to this node. This ensures that the
        # sequence of random numbers does not depend on the number of nodes,
        # provided that the same rng with the same seed is used on each node.
        self.set(**{parametername: rand_distr})

    def initialize(self, **initial_values):
        """
        Set initial values of state variables, e.g. the membrane potential.

        Values passed to initialize() may be:
            (1) single numeric values (all neurons set to the same value)
            (2) RandomDistribution objects
            (3) lists/arrays of numbers of the same size as the population
            (4) mapping functions, where a mapping function accepts a single
                argument (the cell index) and returns a single number.

        Values should be expressed in the standard PyNN units (i.e. millivolts,
        nanoamps, milliseconds, microsiemens, nanofarads, event per second).

        Examples::

            p.initialize(v=-70.0)
            p.initialize(v=rand_distr, gsyn_exc=0.0)
            p.initialize(v=lambda i: -65 + i/10.0)
        """
        for variable, value in initial_values.items():
            logger.debug("In Population '%s', initialising %s to %s" %
                         (self.label, variable, value))
            initial_value = LazyArray(value, shape=(self.size,), dtype=float)
            self._set_initial_value_array(variable, initial_value)
            self.initial_values[variable] = initial_value

    def find_units(self, variable):
        """
        Returns units of the specified variable or parameter, as a string.
        Works for all the recordable variables and neuron parameters of all standard models.
        """
        return self.celltype.units[variable.name]

    def annotate(self, **annotations):
        self.annotations.update(annotations)

    def can_record(self, variable, location=None):
        """Determine whether `variable` can be recorded from this population."""
        return self.celltype.can_record(variable, location)

    @property
    def injectable(self):
        return self.celltype.injectable

    def record(self, variables, to_file=None, sampling_interval=None, locations=None):
        """
        Record the specified variable or variables for all cells in the
        Population or view.

        `variables` may be either a single variable name or a list of variable
        names. For a given celltype class, `celltype.recordable` contains a list of
        variables that can be recorded for that celltype.

        If specified, `to_file` should be either a filename or a Neo IO instance and `write_data()`
        will be automatically called when `end()` is called.

        `sampling_interval` should be a value in milliseconds, and an integer
        multiple of the simulation timestep.
        """
        if variables is None:  # reset the list of things to record
            # note that if record(None) is called on a view of a population
            # recording will be reset for the entire population, not just the view
            self.recorder.reset()
        else:
            logger.debug("%s.record('%s')", self.label, variables)
            if self._record_filter is None:
                self.recorder.record(variables, self.all_cells, sampling_interval, locations)
            else:
                self.recorder.record(variables, self._record_filter, sampling_interval, locations)
        if isinstance(to_file, str):
            self.recorder.file = to_file
            self._simulator.state.write_on_end.append((self, variables, self.recorder.file))

    @deprecated("record('v')")
    def record_v(self, to_file=True):
        """
        Record the membrane potential for all cells in the Population.
        """
        self.record('v', to_file)

    @deprecated("record(['gsyn_exc', 'gsyn_inh'])")
    def record_gsyn(self, to_file=True):
        """
        Record synaptic conductances for all cells in the Population.
        """
        self.record(['gsyn_exc', 'gsyn_inh'], to_file)

    def write_data(self, io, variables='all', gather=True, clear=False, annotations=None, locations=None):
        """
        Write recorded data to file, using one of the file formats supported by
        Neo.

        `io`:
            a Neo IO instance
        `variables`:
            either a single variable name or a list of variable names.
            Variables must have been previously recorded, otherwise an
            Exception will be raised.

        For parallel simulators, if `gather` is True, all data will be gathered
        to the master node and a single output file created there. Otherwise, a
        file will be written on each node, containing only data from the cells
        simulated on that node.

        If `clear` is True, recorded data will be deleted from the `Population`.

        `annotations` should be a dict containing simple data types such as
        numbers and strings. The contents will be written into the output data
        file as metadata.
        """
        logger.debug("Population %s is writing %s to %s [gather=%s, clear=%s]" % (
            self.label, variables, io, gather, clear))
        self.recorder.write(variables, io, gather, self._record_filter, clear=clear,
                            annotations=annotations, locations=locations)

    def get_data(self, variables='all', gather=True, clear=False, locations=None):
        """
        Return a Neo `Block` containing the data (spikes, state variables)
        recorded from the Population.

        `variables` - either a single variable name or a list of variable names
                      Variables must have been previously recorded, otherwise an
                      Exception will be raised.

        For parallel simulators, if `gather` is True, all data will be gathered
        to all nodes and the Neo `Block` will contain data from all nodes.
        Otherwise, the Neo `Block` will contain only data from the cells
        simulated on the local node.

        If `clear` is True, recorded data will be deleted from the `Population`.
        """
        return self.recorder.get(variables, gather, self._record_filter, clear, locations=locations)

    @deprecated("write_data(file, 'spikes')")
    def printSpikes(self, file, gather=True, compatible_output=True):
        self.write_data(file, 'spikes', gather)

    @deprecated("get_data('spikes')")
    def getSpikes(self, gather=True, compatible_output=True):
        return self.get_data('spikes', gather)

    @deprecated("write_data(file, 'v')")
    def print_v(self, file, gather=True, compatible_output=True):
        self.write_data(file, 'v', gather)

    @deprecated("get_data('v')")
    def get_v(self, gather=True, compatible_output=True):
        return self.get_data('v', gather)

    @deprecated("write_data(file, ['gsyn_exc', 'gsyn_inh'])")
    def print_gsyn(self, file, gather=True, compatible_output=True):
        self.write_data(file, ['gsyn_exc', 'gsyn_inh'], gather)

    @deprecated("get_data(['gsyn_exc', 'gsyn_inh'])")
    def get_gsyn(self, gather=True, compatible_output=True):
        return self.get_data(['gsyn_exc', 'gsyn_inh'], gather)

    def get_spike_counts(self, gather=True):
        """
        Returns a dict containing the number of spikes for each neuron.

        The dict keys are neuron IDs, not indices.
        """
        # arguably, we should use indices
        return self.recorder.count('spikes', gather, self._record_filter)

    @deprecated("mean_spike_count()")
    def meanSpikeCount(self, gather=True):
        return self.mean_spike_count(gather)

    def mean_spike_count(self, gather=True):
        """
        Returns the mean number of spikes per neuron.
        """
        spike_counts = self.get_spike_counts(gather)
        total_spikes = sum(spike_counts.values())
        if self._simulator.state.mpi_rank == 0 or not gather:
            # should maybe use allgather, and get the numbers on all nodes
            if len(spike_counts) > 0:
                return float(total_spikes) / len(spike_counts)
            else:
                return 0
        else:
            return np.nan

    def inject(self, current_source):
        """
        Connect a current source to all cells in the Population.
        """
        if not self.celltype.injectable:
            raise TypeError("Can't inject current into a spike source.")
        current_source.inject_into(self)

    # name should be consistent with saving/writing data,
    # i.e. save_data() and save_positions() or write_data() and write_positions()
    def save_positions(self, file):
        """
        Save positions to file. The output format is ``index x y z``
        """
        if isinstance(file, str):
            file = recording.files.StandardTextFile(file, mode='w')
        cells = self.all_cells
        result = np.empty((len(cells), 4))
        result[:, 0] = np.array([self.id_to_index(id) for id in cells])
        result[:, 1:4] = self.positions.T
        if self._simulator.state.mpi_rank == 0:
            file.write(result, {'population': self.label})
            file.close()


class Population(BasePopulation):
    """
    A group of neurons all of the same type. "Population" is used as a generic
    term intended to include layers, columns, nuclei, etc., of cells.

    Arguments:
        `size`:
            number of cells in the Population. For backwards-compatibility,
            `size` may also be a tuple giving the dimensions of a grid,
            e.g. ``size=(10,10)`` is equivalent to ``size=100`` with ``structure=Grid2D()``.

        `cellclass`:
            a cell type (a class inheriting from :class:`pyNN.models.BaseCellType`).

        `cellparams`:
            a dict, or other mapping, containing parameters, which is passed to
            the neuron model constructor.

        `structure`:
            a :class:`pyNN.space.Structure` instance, used to specify the
            positions of neurons in space.

        `initial_values`:
            a dict, or other mapping, containing initial values for the neuron
            state variables.

        `label`:
            a name for the population. One will be auto-generated if this is not
            supplied.
    """
    _nPop = 0

    def __init__(self, size, cellclass, cellparams=None, structure=None,
                 initial_values={}, label=None):
        """
        Create a population of neurons all of the same type.
        """
        if not hasattr(self, "_simulator"):
            err_msg = "`common.Population` should not be instantiated directly. " \
                     "You should import Population from a PyNN backend module, " \
                     "e.g. pyNN.nest or pyNN.neuron"
            raise Exception(err_msg)
        if not isinstance(size, (int, np.integer)):
            # also allow a single integer, for a 1D population
            err_msg = "`size` must be an integer or a tuple of ints."
            if not isinstance(size, tuple):
                raise ValueError(f"{err_msg} You have supplied a {type(size)}")
            # check the things inside are ints
            for e in size:
                if not isinstance(e, int):
                    raise ValueError(f"{err_msg} Element '{e}' is not an int")
            if structure is not None:
                raise ValueError(
                    "If you specify `size` as a tuple you may not specify structure.")
            if len(size) == 1:
                structure = space.Line()
            elif len(size) == 2:
                nx, ny = size
                structure = space.Grid2D(nx / float(ny))
            elif len(size) == 3:
                nx, ny, nz = size
                structure = space.Grid3D(nx / float(ny), nx / float(nz))
            else:
                raise Exception(
                    "A maximum of 3 dimensions is allowed. "
                    "What do you think this is, string theory?")
            size = int(reduce(operator.mul, size))
        self.size = size
        self.label = label or 'population%d' % Population._nPop
        self._structure = structure or space.Line()
        self._positions = None
        self._is_sorted = True
        if isinstance(cellclass, BaseCellType):
            self.celltype = cellclass
            # cellparams being retained for backwards compatibility, but use is deprecated
            assert cellparams is None
        elif issubclass(cellclass, BaseCellType):
            warnings.warn(
                "Passing celltype class and parameters separately is deprecated. "
                "Please instantiate the celltype with the parameters before "
                "passing to the Population",
                category=DeprecationWarning
            )
            self.celltype = cellclass(**cellparams)
        else:
            raise TypeError(
                "cellclass must be an instance or subclass of BaseCellType"
                f"not a {type(cellclass)}")
        self.annotations = {}
        self.recorder = self._recorder_class(self)
        # Build the arrays of cell ids
        # Cells on the local node are represented as ID objects, other cells by integers
        # All are stored in a single numpy array for easy lookup by address
        # The local cells are also stored in a list, for easy iteration
        self._create_cells()
        self.first_id = self.all_cells[0]
        self.last_id = self.all_cells[-1]
        self.initial_values = {}
        all_initial_values = self.celltype.default_initial_values.copy()
        all_initial_values.update(initial_values)
        self.initialize(**all_initial_values)
        Population._nPop += 1

    def __repr__(self):
        return "Population(%d, %r, structure=%r, label=%r)" % (
            self.size, self.celltype, self.structure, self.label)

    @property
    def local_cells(self):
        """
        An array containing cell ids for the local node.
        """
        return self.all_cells[self._mask_local]

[docs] def id_to_index(self, id): """ Given the ID(s) of cell(s) in the Population, return its (their) index (order in the Population). >>> assert p.id_to_index(p[5]) == 5 """ if not np.iterable(id): if not self.first_id <= id <= self.last_id: raise ValueError("id should be in the range [%d,%d], actually %d" % ( self.first_id, self.last_id, id)) return int(id - self.first_id) # this assumes ids are consecutive else: if isinstance(id, PopulationView): id = id.all_cells id = np.array(id) if (self.first_id > id.min()) or (self.last_id < id.max()): raise ValueError("ids should be in the range [%d,%d], actually [%d, %d]" % ( self.first_id, self.last_id, id.min(), id.max())) return (id - self.first_id).astype(int) # this assumes ids are consecutive
[docs] def id_to_local_index(self, id): """ Given the ID(s) of cell(s) in the Population, return its (their) index (order in the Population), counting only cells on the local MPI node. """ if self._simulator.state.num_processes > 1: return self.local_cells.tolist().index(id) # probably very slow # return np.nonzero(self.local_cells == id)[0][0] # possibly faster? # another idea - get global index, use idx-sum(mask_local[:idx])? else: return self.id_to_index(id)
def _get_structure(self): """The spatial structure of the Population.""" return self._structure def _set_structure(self, structure): assert isinstance(structure, space.BaseStructure) if self._structure is None or structure != self._structure: # setting a new structure invalidates previously calculated positions self._positions = None self._structure = structure structure = property(fget=_get_structure, fset=_set_structure) # arguably structure should be read-only, # i.e. it is not possible to change it after Population creation def _get_positions(self): """ Try to return self._positions. If it does not exist, create it and then return it. """ if self._positions is None: self._positions = self.structure.generate_positions(self.size) assert self._positions.shape == (3, self.size) return self._positions def _set_positions(self, pos_array): assert isinstance(pos_array, np.ndarray) assert pos_array.shape == (3, self.size), "%s != %s" % (pos_array.shape, (3, self.size)) self._positions = pos_array.copy() # take a copy in case pos_array is changed later self._structure = None # explicitly setting positions destroys any previous structure positions = property(_get_positions, _set_positions, doc="""A 3xN array (where N is the number of neurons in the Population) giving the x,y,z coordinates of all the neurons (soma, in the case of non-point models).""")
[docs] def describe(self, template='population_default.txt', engine='default'): """ Returns a human-readable description of the population. The output may be customized by specifying a different template together with an associated template engine (see :mod:`pyNN.descriptions`). If template is None, then a dictionary containing the template context will be returned. """ context = { "label": self.label, "celltype": self.celltype.describe(template=None), "structure": None, "size": self.size, "size_local": len(self.local_cells), "first_id": self.first_id, "last_id": self.last_id, } context.update(self.annotations) if len(self.local_cells) > 0: first_id = self.local_cells[0] context.update({ "local_first_id": first_id, "cell_parameters": {} # first_id.get_parameters(), }) if self.structure: context["structure"] = self.structure.describe(template=None) return descriptions.render(engine, template, context)
class PopulationView(BasePopulation): """ A view of a subset of neurons within a Population. In most ways, Populations and PopulationViews have the same behaviour, i.e. they can be recorded, connected with Projections, etc. It should be noted that any changes to neurons in a PopulationView will be reflected in the parent Population and vice versa. It is possible to have views of views. Arguments: selector: a slice or numpy mask array. The mask array should either be a boolean array of the same size as the parent, or an integer array containing cell indices, i.e. if p.size == 5:: PopulationView(p, array([False, False, True, False, True])) PopulationView(p, array([2,4])) PopulationView(p, slice(2,5,2)) will all create the same view. """ def __init__(self, parent, selector, label=None): """ Create a view of a subset of neurons within a parent Population or PopulationView. """ if not hasattr(self, "_simulator"): err_msg = "`common.PopulationView` should not be instantiated directly. " \ "You should import PopulationView from a PyNN backend module, " \ "e.g. pyNN.nest or pyNN.neuron" raise Exception(err_msg) self.parent = parent self.mask = selector # later we can have fancier selectors, for now we just have numpy masks # maybe just redefine __getattr__ instead of the following... self.celltype = self.parent.celltype # If the mask is a slice, IDs will be consecutives without duplication. # If not, then we need to remove duplicated IDs if not isinstance(self.mask, slice): if isinstance(self.mask, list): self.mask = np.array(self.mask) if self.mask.dtype is np.dtype('bool'): if len(self.mask) != len(self.parent): raise Exception("Boolean masks should have the size of Parent Population") self.mask = np.arange(len(self.parent))[self.mask] else: if len(np.unique(self.mask)) != len(self.mask): logger.warning( "PopulationView can contain each ID only once, duplicates are removed") self.mask = np.unique(self.mask) self.mask.sort() # needed by NEST. # Maybe emit a warning or exception if mask is not already ordered? self.all_cells = self.parent.all_cells[self.mask] idx = np.argsort(self.all_cells) self._is_sorted = np.all(idx == np.arange(len(self.all_cells))) self.size = len(self.all_cells) self.label = label or "view of '%s' with size %s" % (parent.label, self.size) self._mask_local = self.parent._mask_local[self.mask] self.local_cells = self.all_cells[self._mask_local] # only works if we assume all_cells is sorted, otherwise could use min() self.first_id = np.min(self.all_cells) self.last_id = np.max(self.all_cells) self.annotations = {} self.recorder = self.parent.recorder self._record_filter = self.all_cells def __repr__(self): return "PopulationView(parent=%r, selector=%r, label=%r)" % ( self.parent, self.mask, self.label) @property def initial_values(self): # this is going to be complex - if we keep initial_values as a dict, # need to return a dict-like object that takes account of self.mask raise NotImplementedError @property def structure(self): """The spatial structure of the parent Population.""" return self.parent.structure # should we allow setting structure for a PopulationView? Maybe if the # parent has some kind of CompositeStructure? @property def positions(self): # make positions N,3 instead of 3,N to avoid all this transposing? return self.parent.positions.T[self.mask].T
[docs] def id_to_index(self, id): """ Given the ID(s) of cell(s) in the PopulationView, return its/their index/indices (order in the PopulationView). >>> assert pv.id_to_index(pv[3]) == 3 """ if not np.iterable(id): if self._is_sorted: if id not in self.all_cells: raise IndexError("ID %s not present in the View" % id) return np.searchsorted(self.all_cells, id) else: result = np.where(self.all_cells == id)[0] if len(result) == 0: raise IndexError("ID %s not present in the View" % id) else: return result else: if self._is_sorted: return np.searchsorted(self.all_cells, id) else: result = np.array([], dtype=int) for item in id: data = np.where(self.all_cells == item)[0] if len(data) == 0: raise IndexError("ID %s not present in the View" % item) elif len(data) > 1: raise Exception("ID %s is duplicated in the View" % item) else: result = np.append(result, data) return result
@property def grandparent(self): """ Returns the parent Population at the root of the tree (since the immediate parent may itself be a PopulationView). The name "grandparent" is of course a little misleading, as it could be just the parent, or the great, great, great, ..., grandparent. """ if hasattr(self.parent, "parent"): return self.parent.grandparent else: return self.parent
[docs] def index_in_grandparent(self, indices): """ Given an array of indices, return the indices in the parent population at the root of the tree. """ indices_in_parent = np.arange(self.parent.size)[self.mask][indices] if hasattr(self.parent, "parent"): return self.parent.index_in_grandparent(indices_in_parent) else: return indices_in_parent
[docs] def index_from_parent_index(self, indices): """ Given an index(indices) in the parent population, return the index(indices) within this view. """ # todo: add check that all indices correspond to cells that are in this view if isinstance(self.mask, slice): start = self.mask.start or 0 step = self.mask.step or 1 return (indices - start) / step else: if isinstance(indices, int): return np.nonzero(self.mask == indices)[0][0] elif isinstance(indices, np.ndarray): # Lots of ways to do this. Some profiling is in order. # - https://stackoverflow.com/questions/16992713/translate-every-element-in-numpy-array-according-to-key # noqa:E501 # - https://stackoverflow.com/questions/3403973/fast-replacement-of-values-in-a-numpy-array # noqa:E501 # - https://stackoverflow.com/questions/13572448/replace-values-of-a-numpy-index-array-with-values-of-a-list # noqa:E501 parent_indices = self.mask # assert mask is sorted view_indices = np.arange(self.size) index = np.digitize(indices, parent_indices, right=True) return view_indices[index] else: raise ValueError("indices must be an integer or an array of integers")
def __eq__(self, other): """ Determine whether two views are the same. """ return not self.__ne__(other) def __ne__(self, other): """ Determine whether two views are different. """ # We can't use the self.mask, as different masks can select the same cells # (e.g. slices vs arrays), therefore we have to use self.all_cells if isinstance(other, PopulationView): return ( self.parent != other.parent or not np.array_equal(self.all_cells, other.all_cells) ) elif isinstance(other, Population): return ( self.parent != other or not np.array_equal(self.all_cells, other.all_cells) ) else: return True
[docs] def describe(self, template='populationview_default.txt', engine='default'): """ Returns a human-readable description of the population view. The output may be customized by specifying a different template togther with an associated template engine (see ``pyNN.descriptions``). If template is None, then a dictionary containing the template context will be returned. """ context = {"label": self.label, "parent": self.parent.label, "mask": self.mask, "size": self.size} context.update(self.annotations) return descriptions.render(engine, template, context)
class Assembly(object): """ A group of neurons, may be heterogeneous, in contrast to a Population where all the neurons are of the same type. Arguments: populations: Populations or PopulationViews kwargs: May contain a keyword argument 'label' """ _count = 0 def __init__(self, *populations, **kwargs): """ Create an Assembly of Populations and/or PopulationViews. """ if not hasattr(self, "_simulator"): err_msg = "`common.Assembly` should not be instantiated directly. " \ "You should import Assembly from a PyNN backend module, " \ "e.g. pyNN.nest or pyNN.neuron" raise Exception(err_msg) if kwargs: assert list(kwargs.keys()) == ['label'] self.populations = [] for p in populations: self._insert(p) self.label = kwargs.get('label', 'assembly%d' % Assembly._count) assert isinstance(self.label, str), "label must be a string" self.annotations = {} Assembly._count += 1 def __repr__(self): return "Assembly(*%r, label=%r)" % (self.populations, self.label) def _insert(self, element): if not isinstance(element, BasePopulation): raise TypeError("argument is a %s, not a Population." % type(element).__name__) if isinstance(element, PopulationView): if element.parent not in self.populations: double = False for p in self.populations: data = np.concatenate((p.all_cells, element.all_cells)) if len(np.unique(data)) != len(p.all_cells) + len(element.all_cells): logger.warning( 'Cannnot add a PopulationView to an Assembly ' 'containing elements already present') double = True # Should we automatically remove duplicated IDs ? break if not double: self.populations.append(element) else: logger.warning( 'Cannot add a PopulationView to an Assembly when parent Population is there') elif isinstance(element, BasePopulation): if element not in self.populations: self.populations.append(element) else: logger.warning('Adding a Population twice in an Assembly is not possible') @property def local_cells(self): result = self.populations[0].local_cells for p in self.populations[1:]: result = np.concatenate((result, p.local_cells)) return result @property def all_cells(self): result = self.populations[0].all_cells for p in self.populations[1:]: result = np.concatenate((result, p.all_cells)) return result
[docs] def all(self): """Iterator over cell ids on all nodes.""" return iter(self.all_cells)
@property def _is_sorted(self): idx = np.argsort(self.all_cells) return np.all(idx == np.arange(len(self.all_cells))) @property def _homogeneous_synapses(self): cb = [p.celltype.conductance_based for p in self.populations] return all(cb) or not any(cb) @property def conductance_based(self): """ `True` if the post-synaptic response is modelled as a change in conductance, `False` if a change in current. """ return all(p.celltype.conductance_based for p in self.populations) @property def receptor_types(self): """ Return a list of receptor types that are common to all populations within the assembly. """ rts = self.populations[0].celltype.receptor_types if len(self.populations) > 1: rts = set(rts) for p in self.populations[1:]: rts = rts.intersection(set(p.celltype.receptor_types)) return list(rts)
[docs] def find_units(self, variable): """ Returns units of the specified variable or parameter, as a string. Works for all the recordable variables and neuron parameters of all standard models. """ units = set(p.find_units(variable) for p in self.populations) if len(units) > 1: raise ValueError("Inconsistent units") return units
@property def _mask_local(self): result = self.populations[0]._mask_local for p in self.populations[1:]: result = np.concatenate((result, p._mask_local)) return result @property def first_id(self): return np.min(self.all_cells) @property def last_id(self): return np.max(self.all_cells)
[docs] def id_to_index(self, id): """ Given the ID(s) of cell(s) in the Assembly, return its (their) index (order in the Assembly):: >>> assert p.id_to_index(p[5]) == 5 >>> assert p.id_to_index(p.index([1, 2, 3])) == [1, 2, 3] """ all_cells = self.all_cells if not np.iterable(id): if self._is_sorted: return np.searchsorted(all_cells, id) else: result = np.where(all_cells == id)[0] if len(result) == 0: raise IndexError("ID %s not present in the View" % id) else: return result else: if self._is_sorted: return np.searchsorted(all_cells, id) else: result = np.array([], dtype=int) for item in id: data = np.where(all_cells == item)[0] if len(data) == 0: raise IndexError("ID %s not present in the Assembly" % item) elif len(data) > 1: raise Exception("ID %s is duplicated in the Assembly" % item) else: result = np.append(result, data) return result
@property def positions(self): result = self.populations[0].positions for p in self.populations[1:]: result = np.hstack((result, p.positions)) return result @property def size(self): return sum(p.size for p in self.populations)
[docs] def __iter__(self): """ Iterator over cells in all populations within the Assembly, for cells on the local MPI node. """ iterators = [iter(p) for p in self.populations] return chain(*iterators)
[docs] def __len__(self): """Return the total number of cells in the population (all nodes).""" return self.size
[docs] def __getitem__(self, index): """ Where `index` is an integer, return an ID. Where `index` is a slice, tuple, list or numpy array, return a new Assembly consisting of appropriate populations and (possibly newly created) population views. """ count = 0 boundaries = [0] for p in self.populations: count += p.size boundaries.append(count) boundaries = np.array(boundaries, dtype=int) if isinstance(index, (int, np.integer)): # return an ID pindex = boundaries[1:].searchsorted(index, side='right') return self.populations[pindex][index - boundaries[pindex]] elif isinstance(index, (slice, tuple, list, np.ndarray)): if isinstance(index, slice) or (hasattr(index, "dtype") and index.dtype == bool): indices = np.arange(self.size)[index] else: indices = np.array(index) pindices = boundaries[1:].searchsorted(indices, side='right') views = [self.populations[i][indices[pindices == i] - boundaries[i]] for i in np.unique(pindices)] return self.__class__(*views) else: raise TypeError("indices must be integers, slices, lists, arrays, not %s" % type(index).__name__)
[docs] def __add__(self, other): """ An Assembly may be added to a Population, PopulationView or Assembly with the '+' operator, returning a new Assembly, e.g.:: a2 = a1 + p """ if isinstance(other, BasePopulation): return self.__class__(*(self.populations + [other])) elif isinstance(other, Assembly): return self.__class__(*(self.populations + other.populations)) else: raise TypeError("can only add a Population or another Assembly to an Assembly")
[docs] def __iadd__(self, other): """ A Population, PopulationView or Assembly may be added to an existing Assembly using the '+=' operator, e.g.:: a += p """ if isinstance(other, BasePopulation): self._insert(other) elif isinstance(other, Assembly): for p in other.populations: self._insert(p) else: raise TypeError("can only add a Population or another Assembly to an Assembly") return self
[docs] def sample(self, n, rng=None): """ Randomly sample `n` cells from the Assembly, and return a Assembly object. """ assert isinstance(n, int) if not rng: rng = random.NumpyRNG() indices = rng.permutation(np.arange(len(self), dtype=int))[0:n] logger.debug("The %d cells recorded have indices %s" % (n, indices)) logger.debug("%s.sample(%s)", self.label, n) return self[indices]
[docs] def initialize(self, **initial_values): """ Set the initial values of the state variables of the neurons in this assembly. """ for p in self.populations: p.initialize(**initial_values)
[docs] def get(self, parameter_names, gather=False, simplify=True): """ Get the values of the given parameters for every local cell in the Assembly, or, if gather=True, for all cells in the Assembly. """ if isinstance(parameter_names, str): parameter_names = (parameter_names,) return_list = False else: return_list = True parameters = defaultdict(list) for p in self.populations: population_values = p.get(parameter_names, gather, simplify=False) for name, arr in zip(parameter_names, population_values): parameters[name].append(arr) for name, value_list in parameters.items(): parameters[name] = np.hstack(value_list) if simplify: parameters[name] = simplify_parameter_array(parameters[name]) values = [parameters[name] for name in parameter_names] if return_list: return values else: assert len(parameter_names) == 1 return values[0]
[docs] def set(self, **parameters): """ Set one or more parameters for every cell in the Assembly. Values passed to set() may be: (1) single values (2) RandomDistribution objects (3) mapping functions, where a mapping function accepts a single argument (the cell index) and returns a single value. Here, a "single value" may be either a single number or a list/array of numbers (e.g. for spike times). """ for p in self.populations: p.set(**parameters)
@deprecated("set(parametername=rand_distr)") def rset(self, parametername, rand_distr): self.set(parametername=rand_distr)
[docs] def record(self, variables, to_file=None, sampling_interval=None, locations=None): """ Record the specified variable or variables for all cells in the Assembly. `variables` may be either a single variable name or a list of variable names. For a given celltype class, `celltype.recordable` contains a list of variables that can be recorded for that celltype. If specified, `to_file` should be either a filename or a Neo IO instance and `write_data()` will be automatically called when `end()` is called. """ for p in self.populations: p.record(variables, to_file, sampling_interval, locations=locations)
@deprecated("record('v')") def record_v(self, to_file=True): """Record the membrane potential from all cells in the Assembly.""" self.record('v', to_file) @deprecated("record(['gsyn_exc', 'gsyn_inh'])") def record_gsyn(self, to_file=True): """Record synaptic conductances from all cells in the Assembly.""" self.record(['gsyn_exc', 'gsyn_inh'], to_file)
[docs] def get_population(self, label): """ Return the Population/PopulationView from within the Assembly that has the given label. If no such Population exists, raise KeyError. """ for p in self.populations: if label == p.label: return p raise KeyError("Assembly does not contain a population with the label %s" % label)
[docs] def save_positions(self, file): """ Save positions to file. The output format is id x y z """ if isinstance(file, str): file = files.StandardTextFile(file, mode='w') cells = self.all_cells result = np.empty((len(cells), 4)) result[:, 0] = np.array([self.id_to_index(id) for id in cells]) result[:, 1:4] = self.positions.T if self._simulator.state.mpi_rank == 0: file.write(result, {'assembly': self.label}) file.close()
@property def position_generator(self): def gen(i): return self.positions[:, i] return gen
[docs] def get_data(self, variables='all', gather=True, clear=False, annotations=None): """ Return a Neo `Block` containing the data (spikes, state variables) recorded from the Assembly. `variables` - either a single variable name or a list of variable names Variables must have been previously recorded, otherwise an Exception will be raised. For parallel simulators, if `gather` is True, all data will be gathered to all nodes and the Neo `Block` will contain data from all nodes. Otherwise, the Neo `Block` will contain only data from the cells simulated on the local node. If `clear` is True, recorded data will be deleted from the `Assembly`. """ name = self.label description = self.describe() blocks = [p.get_data(variables, gather, clear) for p in self.populations] # adjust channel_ids to match assembly channel indices offset = 0 for block, p in zip(blocks, self.populations): for segment in block.segments: for signal_array in segment.analogsignals: signal_array.array_annotations["channel_index"] += offset offset += p.size for i, block in enumerate(blocks): logger.debug("%d: %s", i, block.name) for j, segment in enumerate(block.segments): segment.block = blocks[0] logger.debug(" %d: %s", j, segment.name) for arr in segment.analogsignals: arr.segment = blocks[0].segments[j] logger.debug(" %s %s", arr.shape, arr.name) merged_block = blocks[0] for block in blocks[1:]: merged_block.merge(block) merged_block.name = name merged_block.description = description if annotations: merged_block.annotate(**annotations) return merged_block
@deprecated("get_data('spikes')") def getSpikes(self, gather=True, compatible_output=True): return self.get_data('spikes', gather) @deprecated("get_data('v')") def get_v(self, gather=True, compatible_output=True): return self.get_data('v', gather) @deprecated("get_data(['gsyn_exc', 'gsyn_inh'])") def get_gsyn(self, gather=True, compatible_output=True): return self.get_data(['gsyn_exc', 'gsyn_inh'], gather)
[docs] def mean_spike_count(self, gather=True): """ Returns the mean number of spikes per neuron. """ spike_counts = self.get_spike_counts() total_spikes = sum(spike_counts.values()) if self._simulator.state.mpi_rank == 0 or not gather: # should maybe use allgather, and get the numbers on all nodes return float(total_spikes) / len(spike_counts) else: return np.nan
[docs] def get_spike_counts(self, gather=True): """ Returns the number of spikes for each neuron. """ try: spike_counts = self.populations[0].recorder.count( 'spikes', gather, self.populations[0]._record_filter) except errors.NothingToWriteError: spike_counts = {} for p in self.populations[1:]: try: spike_counts.update(p.recorder.count('spikes', gather, p._record_filter)) except errors.NothingToWriteError: pass return spike_counts
[docs] def write_data(self, io, variables='all', gather=True, clear=False, annotations=None): """ Write recorded data to file, using one of the file formats supported by Neo. `io`: a Neo IO instance `variables`: either a single variable name or a list of variable names. Variables must have been previously recorded, otherwise an Exception will be raised. For parallel simulators, if `gather` is True, all data will be gathered to the master node and a single output file created there. Otherwise, a file will be written on each node, containing only data from the cells simulated on that node. If `clear` is True, recorded data will be deleted from the `Population`. """ if isinstance(io, str): io = recording.get_io(io) if gather is False and self._simulator.state.num_processes > 1: io.filename += '.%d' % self._simulator.state.mpi_rank logger.debug("Recorder is writing '%s' to file '%s' with gather=%s" % ( variables, io.filename, gather)) data = self.get_data(variables, gather, clear, annotations) if self._simulator.state.mpi_rank == 0 or gather is False: logger.debug("Writing data to file %s" % io) io.write(data)
@deprecated("write_data(file, 'spikes')") def printSpikes(self, file, gather=True, compatible_output=True): self.write_data(file, 'spikes', gather) @deprecated("write_data(file, 'v')") def print_v(self, file, gather=True, compatible_output=True): self.write_data(file, 'v', gather) @deprecated("write_data(['gsyn_exc', 'gsyn_inh'])") def print_gsyn(self, file, gather=True, compatible_output=True): self.write_data(file, ['gsyn_exc', 'gsyn_inh'], gather)
[docs] def inject(self, current_source): """ Connect a current source to all cells in the Assembly. """ for p in self.populations: current_source.inject_into(p)
@property def injectable(self): return all(p.injectable for p in self.populations)
[docs] def describe(self, template='assembly_default.txt', engine='default'): """ Returns a human-readable description of the assembly. The output may be customized by specifying a different template togther with an associated template engine (see ``pyNN.descriptions``). If template is None, then a dictionary containing the template context will be returned. """ context = {"label": self.label, "populations": [p.describe(template=None) for p in self.populations]} return descriptions.render(engine, template, context)
[docs] def get_annotations(self, annotation_keys, simplify=True): """ Get the values of the given annotations for each population in the Assembly. """ if isinstance(annotation_keys, str): annotation_keys = (annotation_keys,) annotations = defaultdict(list) for key in annotation_keys: is_array_annotation = False for p in self.populations: annotation = p.annotations[key] annotations[key].append(annotation) is_array_annotation = isinstance(annotation, np.ndarray) if is_array_annotation: annotations[key] = np.hstack(annotations[key]) if simplify: annotations[key] = simplify_parameter_array(np.array(annotations[key])) return annotations