Source code for spynnaker.pyNN.utilities.utility_calls

# Copyright (c) 2017-2019 The University of Manchester
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""
utility class containing simple helper methods
"""
import logging
import os
import math
import numpy
from pyNN.random import RandomDistribution
from scipy.stats import binom
from spinn_utilities.log import FormatAdapter
from spinn_utilities.safe_eval import SafeEval
from spinn_front_end_common.utilities.exceptions import ConfigurationException
from spynnaker.pyNN.utilities.random_stats import (
    RandomStatsExponentialImpl, RandomStatsGammaImpl, RandomStatsLogNormalImpl,
    RandomStatsNormalClippedImpl, RandomStatsNormalImpl,
    RandomStatsPoissonImpl, RandomStatsRandIntImpl, RandomStatsUniformImpl,
    RandomStatsVonmisesImpl, RandomStatsBinomialImpl)
from spinn_front_end_common.utilities.constants import (
    MICRO_TO_SECOND_CONVERSION)
from spynnaker.pyNN.utilities.constants import WRITE_BANDWIDTH_BYTES_PER_SECOND

logger = FormatAdapter(logging.getLogger(__name__))

MAX_RATE = 2 ** 32 - 1  # To allow a unit32_t to be used to store the rate

BASE_RANDOM_FOR_MARS_64 = 0x80000000
CAP_RANDOM_FOR_MARS_64 = 0x7FFFFFFF
# in order are x, y, z, c
N_RANDOM_NUMBERS = 4
ARBITRARY_Y = 13031301
MARS_C_MAX = 698769068

STATS_BY_NAME = {
    'binomial': RandomStatsBinomialImpl(),
    'gamma': RandomStatsGammaImpl(),
    'exponential': RandomStatsExponentialImpl(),
    'lognormal': RandomStatsLogNormalImpl(),
    'normal': RandomStatsNormalImpl(),
    'normal_clipped': RandomStatsNormalClippedImpl(),
    'normal_clipped_to_boundary': RandomStatsNormalClippedImpl(),
    'poisson': RandomStatsPoissonImpl(),
    'uniform': RandomStatsUniformImpl(),
    'randint': RandomStatsRandIntImpl(),
    'vonmises': RandomStatsVonmisesImpl()}


[docs]def check_directory_exists_and_create_if_not(filename): """ Create a parent directory for a file if it doesn't exist :param str filename: The file whose parent directory is to be created """ directory = os.path.dirname(filename) if directory != "" and not os.path.exists(directory): os.makedirs(directory)
[docs]def convert_param_to_numpy(param, no_atoms): """ Convert parameters into numpy arrays. :param param: the param to convert :type param: ~pyNN.random.NumpyRNG or int or float or list(int) or list(float) or ~numpy.ndarray :param int no_atoms: the number of atoms available for conversion of param :return: the converted param as an array of floats :rtype: ~numpy.ndarray(float) """ # Deal with random distributions by generating values if isinstance(param, RandomDistribution): # numpy reduces a single valued array to a single value, so enforce # that it is an array param_value = param.next(n=no_atoms) if hasattr(param_value, '__iter__'): return numpy.array(param_value, dtype="float") return numpy.array([param_value], dtype="float") # Deal with a single value by exploding to multiple values if not hasattr(param, '__iter__'): return numpy.array([param] * no_atoms, dtype="float") # Deal with multiple values, but not the correct number of them if len(param) != no_atoms: raise ConfigurationException( "The number of params does not equal with the number of atoms in" " the vertex") # Deal with the correct number of multiple values return numpy.array(param, dtype="float")
[docs]def convert_to(value, data_type): """ Convert a value to a given data type :param value: The value to convert :param ~data_specification.enums.DataType data_type: The data type to convert to :return: The converted data as a numpy data type :rtype: ~numpy.ndarray(int32) """ return numpy.round(data_type.encode_as_int(value)).astype( data_type.struct_encoding)
[docs]def read_in_data_from_file( file_path, min_atom, max_atom, min_time, max_time, extra=False): """ Read in a file of data values where the values are in a format of: <time>\t<atom ID>\t<data value> :param str file_path: absolute path to a file containing the data :param int min_atom: min neuron ID to which neurons to read in :param int max_atom: max neuron ID to which neurons to read in :param extra: :param min_time: min time slot to read neurons values of. :type min_time: float or int :param max_time: max time slot to read neurons values of. :type max_time: float or int :return: a numpy array of (time stamp, atom ID, data value) :rtype: ~numpy.ndarray(tuple(float, int, float)) """ times = list() atom_ids = list() data_items = list() evaluator = SafeEval() with open(file_path, 'r') as f: for line in f.readlines(): if line.startswith('#'): continue if extra: time, neuron_id, data_value, extra = line.split("\t") else: time, neuron_id, data_value = line.split("\t") time = float(evaluator.eval(time)) neuron_id = int(evaluator.eval(neuron_id)) data_value = float(evaluator.eval(data_value)) if (min_atom <= neuron_id < max_atom and min_time <= time < max_time): times.append(time) atom_ids.append(neuron_id) data_items.append(data_value) else: print("failed to enter {}:{}".format(neuron_id, time)) result = numpy.dstack((atom_ids, times, data_items))[0] return result[numpy.lexsort((times, atom_ids))]
[docs]def read_spikes_from_file(file_path, min_atom=0, max_atom=float('inf'), min_time=0, max_time=float('inf'), split_value="\t"): """ Read spikes from a file formatted as: <time>\t<neuron ID> :param str file_path: absolute path to a file containing spike values :param min_atom: min neuron ID to which neurons to read in :type min_atom: int or float :param max_atom: max neuron ID to which neurons to read in :type max_atom: int or float :param min_time: min time slot to read neurons values of. :type min_time: float or int :param max_time: max time slot to read neurons values of. :type max_time: float or int :param str split_value: the pattern to split by :return: a numpy array with max_atom elements each of which is a list of\ spike times. :rtype: numpy.ndarray(int, int) """ # pylint: disable=too-many-arguments # For backward compatibility as previous version tested for None rather # than having default values if min_atom is None: min_atom = 0.0 if max_atom is None: max_atom = float('inf') if min_time is None: min_time = 0.0 if max_time is None: max_time = float('inf') data = [] with open(file_path, 'r') as f_source: read_data = f_source.readlines() evaluator = SafeEval() for line in read_data: if line.startswith('#'): continue values = line.split(split_value) time = float(evaluator.eval(values[0])) neuron_id = float(evaluator.eval(values[1])) if (min_atom <= neuron_id < max_atom and min_time <= time < max_time): data.append([neuron_id, time]) data.sort() return numpy.array(data)
[docs]def get_probable_maximum_selected( n_total_trials, n_trials, selection_prob, chance=(1.0 / 100.0)): """ Get the likely maximum number of items that will be selected from a\ set of n_trials from a total set of n_total_trials\ with a probability of selection of selection_prob """ prob = 1.0 - (chance / float(n_total_trials)) return binom.ppf(prob, n_trials, selection_prob)
[docs]def get_probable_minimum_selected( n_total_trials, n_trials, selection_prob, chance=(1.0 / 100.0)): """ Get the likely minimum number of items that will be selected from a\ set of n_trials from a total set of n_total_trials\ with a probability of selection of selection_prob """ prob = (chance / float(n_total_trials)) return binom.ppf(prob, n_trials, selection_prob)
[docs]def get_probability_within_range(dist, lower, upper): """ Get the probability that a value will fall within the given range for\ a given RandomDistribution """ stats = STATS_BY_NAME[dist.name] return stats.cdf(dist, upper) - stats.cdf(dist, lower)
[docs]def get_maximum_probable_value(dist, n_items, chance=(1.0 / 100.0)): """ Get the likely maximum value of a RandomDistribution given a\ number of draws """ stats = STATS_BY_NAME[dist.name] prob = 1.0 - (chance / float(n_items)) return stats.ppf(dist, prob)
[docs]def get_minimum_probable_value(dist, n_items, chance=(1.0 / 100.0)): """ Get the likely minimum value of a RandomDistribution given a\ number of draws """ stats = STATS_BY_NAME[dist.name] prob = chance / float(n_items) return stats.ppf(dist, prob)
[docs]def get_mean(dist): """ Get the mean of a RandomDistribution """ stats = STATS_BY_NAME[dist.name] return stats.mean(dist)
[docs]def get_standard_deviation(dist): """ Get the standard deviation of a RandomDistribution """ stats = STATS_BY_NAME[dist.name] return stats.std(dist)
[docs]def get_variance(dist): """ Get the variance of a RandomDistribution """ stats = STATS_BY_NAME[dist.name] return stats.var(dist)
[docs]def high(dist): """ Gets the high or max boundary value for this distribution Could return None """ stats = STATS_BY_NAME[dist.name] return stats.high(dist)
[docs]def low(dist): """ Gets the high or min boundary value for this distribution Could return None """ stats = STATS_BY_NAME[dist.name] return stats.low(dist)
def _validate_mars_kiss_64_seed(seed): """ Update the seed to make it compatible with the RNG algorithm """ if seed[1] == 0: # y (<- seed[1]) can't be zero so set to arbitrary non-zero if so seed[1] = ARBITRARY_Y # avoid z=c=0 and make < 698769069 seed[3] = seed[3] % MARS_C_MAX + 1 return seed
[docs]def create_mars_kiss_seeds(rng): """ generates and checks that the seed values generated by the given\ random number generator or seed to a random number generator are\ suitable for use as a mars 64 kiss seed. :param rng: the random number generator. :type rng: ~numpy.random.RandomState :param seed: the seed to create a random number generator if not handed. :type seed: int or None :return: a list of 4 ints which are used by the mars64 kiss random number generator for seeds. :rtype: list(int) """ kiss_seed = _validate_mars_kiss_64_seed([ rng.randint(-BASE_RANDOM_FOR_MARS_64, CAP_RANDOM_FOR_MARS_64) + BASE_RANDOM_FOR_MARS_64 for _ in range(N_RANDOM_NUMBERS)]) return kiss_seed
[docs]def get_n_bits(n_values): """ Determine how many bits are required for the given number of values :param int n_values: the number of values (starting at 0) :return: the number of bits required to express that many values :rtype: int """ if n_values == 0: return 0 if n_values == 1: return 1 return int(math.ceil(math.log(n_values, 2)))
[docs]def moved_in_v6(old_location, new_location): """ Warns the users that they are using an old import. In version 7 this will ne upgraded to a exception and then later removed :param str old_location: old import :param str new_location: new import :raise: an exception if in CONTINUOUS_INTEGRATION """ if os.environ.get('CONTINUOUS_INTEGRATION', 'false').lower() == 'true': raise NotImplementedError("Old import: {}".format(old_location)) logger.warning("File {} moved to {}. Please fix your imports. " "In version 7 this will fail completely." "".format(old_location, new_location))
[docs]def get_time_to_write_us(n_bytes, n_cores): """ Determine how long a write of a given number of bytes will take in us :param int n_bytes: The number of bytes to transfer :param int n_cores: How many cores will be writing at the same time """ bandwidth_per_core = WRITE_BANDWIDTH_BYTES_PER_SECOND / n_cores seconds = n_bytes / bandwidth_per_core return int(math.ceil(seconds * MICRO_TO_SECOND_CONVERSION))