Source code for spinnman.processes.get_heap_process

# Copyright (c) 2017-2019 The University of Manchester
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import struct
import functools
from spinnman.processes import AbstractMultiConnectionProcess
from spinnman.constants import SYSTEM_VARIABLE_BASE_ADDRESS
from spinnman.model import HeapElement
from spinnman.messages.spinnaker_boot import SystemVariableDefinition
from spinnman.messages.scp.impl import ReadMemory


HEAP_ADDRESS = SystemVariableDefinition.sdram_heap_address
_ADDRESS = struct.Struct("<I")
_HEAP_POINTER = struct.Struct("<4xI")
_ELEMENT_HEADER = struct.Struct("<II")


[docs]class GetHeapProcess(AbstractMultiConnectionProcess): __slots__ = [ "_blocks", "_heap_address", "_next_block_address"] def __init__(self, connection_selector): """ :param connection_selector: :type connection_selector: AbstractMultiConnectionProcessConnectionSelector """ super().__init__(connection_selector) self._heap_address = None self._next_block_address = None self._blocks = list() def _read_heap_address_response(self, response): self._heap_address = _ADDRESS.unpack_from( response.data, response.offset)[0] def _read_heap_pointer(self, response): self._next_block_address = _HEAP_POINTER.unpack_from( response.data, response.offset)[0] def _read_next_block(self, block_address, response): self._next_block_address, free = _ELEMENT_HEADER.unpack_from( response.data, response.offset) if self._next_block_address != 0: self._blocks.append(HeapElement( block_address, self._next_block_address, free)) def _read_address(self, chip_address, address, size, callback): (x, y) = chip_address self._send_request( ReadMemory(x, y, address, size), callback) self._finish() self.check_for_error()
[docs] def get_heap(self, chip_address, pointer=HEAP_ADDRESS): """ :param tuple(int,int) chip_address: x, y :param SystemVariableDefinition pointer: :rtype: list(HeapElement) """ self._read_address( chip_address, SYSTEM_VARIABLE_BASE_ADDRESS + pointer.offset, pointer.data_type.value, self._read_heap_address_response) self._read_address( chip_address, self._heap_address, 8, self._read_heap_pointer) while self._next_block_address != 0: self._read_address( chip_address, self._next_block_address, 8, functools.partial( self._read_next_block, self._next_block_address)) return self._blocks