Source code for spinnman.connections.udp_packet_connections.bmp_connection

# Copyright (c) 2017-2019 The University of Manchester
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <>.

import struct
from spinn_utilities.overrides import overrides
from .udp_connection import UDPConnection
from .utils import update_sdp_header_for_udp_send
from spinnman.constants import SCP_SCAMP_PORT
from spinnman.messages.scp.enums import SCPResult
from spinnman.connections.abstract_classes import SCPReceiver, SCPSender

_TWO_SHORTS = struct.Struct("<2H")
_TWO_SKIP = struct.Struct("<2x")
_REPR_TEMPLATE = "BMPConnection(cabinet={}, frame={}, boards={}, " \
    "local_host={}, local_port={}, remote_host={}, remote_port={}"

[docs]class BMPConnection(UDPConnection, SCPReceiver, SCPSender): """ A BMP connection which supports queries to the BMP of a SpiNNaker\ machine """ __slots__ = [ "_boards", "_cabinet", "_frame"] def __init__(self, connection_data): """ :param BMPConnectionData connection_data: The description of what to connect to. """ port = SCP_SCAMP_PORT if connection_data.port_num is None\ else connection_data.port_num super().__init__( remote_host=connection_data.ip_address, remote_port=port) self._cabinet = connection_data.cabinet self._frame = connection_data.frame self._boards = connection_data.boards @property def cabinet(self): """ The cabinet ID of the BMP :rtype: int """ return self._cabinet @property def frame(self): """ The frame ID of the BMP :rtype: int """ return self._frame @property def boards(self): """ The set of boards supported by the BMP :rtype: iterable of int """ return self._boards @property def chip_x(self): """ Defined to satisfy the SCPSender - always 0 for a BMP """ return 0 @property def chip_y(self): """ Defined to satisfy the SCPSender - always 0 for a BMP """ return 0
[docs] @overrides(SCPSender.get_scp_data) def get_scp_data(self, scp_request): update_sdp_header_for_udp_send(scp_request.sdp_header, 0, 0) return _TWO_SKIP.pack() + scp_request.bytestring
[docs] @overrides(SCPReceiver.receive_scp_response) def receive_scp_response(self, timeout=1.0): data = self.receive(timeout) result, sequence = _TWO_SHORTS.unpack_from(data, 10) return SCPResult(result), sequence, data, 2
[docs] @overrides(SCPSender.send_scp_request) def send_scp_request(self, scp_request): self.send(self.get_scp_data(scp_request))
def __repr__(self): return _REPR_TEMPLATE.format( self._cabinet, self._frame, self._boards, self.local_ip_address, self.local_port, self.remote_ip_address, self.remote_port)