Source code for spinn_front_end_common.interface.buffer_management.storage_objects.buffered_sending_region

# Copyright (c) 2017-2019 The University of Manchester
#
# This program is free software: you can redistribute it and/or modify
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import bisect
import math
from spinnman.messages.eieio.command_messages import HostSendSequencedData
from spinnman.messages.eieio import EIEIOType
from spinnman.constants import UDP_MESSAGE_MAX_SIZE

# The number of bytes in each key to be sent
_N_BYTES_PER_KEY = EIEIOType.KEY_32_BIT.key_bytes  # @UndefinedVariable

# The number of keys allowed (different from the actual number as there is
_N_KEYS_PER_MESSAGE = (UDP_MESSAGE_MAX_SIZE -
(HostSendSequencedData.get_min_packet_length() +

def get_n_bytes(n_keys):
""" Get the number of bytes used by a given number of keys.

:param int n_keys: The number of keys
"""
# Get the total number of messages
n_messages = int(math.ceil(float(n_keys) / _N_KEYS_PER_MESSAGE))

(n_keys * _N_BYTES_PER_KEY))

[docs]class BufferedSendingRegion(object):
""" A set of keys to be sent at given timestamps for a given region of\
data.  Note that keys must be added in timestamp order or else an\
exception will be raised.
"""

__slots__ = [
#: A dictionary of timestamp -> list of keys
"_buffer",

#: A list of timestamps
"_timestamps",

#: The current position in the list of timestamps
"_current_timestamp_pos"
]

def __init__(self):
self._buffer = dict()
self._timestamps = list()
self._current_timestamp_pos = 0

""" Add a key to be sent at a given time.

:param int timestamp: The time at which the key is to be sent
:param int key: The key to send
"""
if timestamp not in self._buffer:
bisect.insort(self._timestamps, timestamp)
self._buffer[timestamp] = list()
self._buffer[timestamp].append(key)

""" Add a set of keys to be sent at the given time.

:param int timestamp: The time at which the keys are to be sent
:param iterable(int) keys: The keys to send
"""
for key in keys:

@property
def n_timestamps(self):
""" The number of timestamps available.

:rtype: int
"""
return len(self._timestamps)

@property
def timestamps(self):
""" The timestamps for which there are keys.

:rtype: iterable(int)
"""
return self._timestamps

[docs]    def get_n_keys(self, timestamp):
""" Get the number of keys for a given timestamp.

:param timestamp:
the time stamp to check if there's still keys to transmit
"""
if timestamp in self._buffer:
return len(self._buffer[timestamp])
return 0

@property
def is_next_timestamp(self):
""" Determines if the region is empty.
True if the region is empty, false otherwise.

:rtype: bool
"""
return self._current_timestamp_pos < len(self._timestamps)

@property
def next_timestamp(self):
""" The next timestamp of the data to be sent, or None if no more data.

:rtype: int or None
"""
if self.is_next_timestamp:
return self._timestamps[self._current_timestamp_pos]
return None

[docs]    def is_next_key(self, timestamp):
""" Determine if there is another key for the given timestamp.

:param bool timestamp:
the time stamp to check if there's still keys to transmit
"""
if timestamp in self._buffer:
return bool(self._buffer[timestamp])
return False

@property
def next_key(self):
""" The next key to be sent.

:rtype: int
"""
next_timestamp = self.next_timestamp
keys = self._buffer[next_timestamp]
key = keys.pop()
if not keys:
del self._buffer[next_timestamp]
self._current_timestamp_pos += 1
return key

@property
def current_timestamp(self):
""" The current timestamp in the iterator.
"""
return self._current_timestamp_pos

[docs]    def rewind(self):
""" Rewind the buffer to initial position.
"""
self._current_timestamp_pos = 0

[docs]    def clear(self):
""" Clears the buffer.
"""

self._buffer = dict()
self._timestamps = list()
self._current_timestamp_pos = 0