Source code for spinn_front_end_common.utilities.database.database_connection

# Copyright (c) 2017-2019 The University of Manchester
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import logging
from threading import Thread
from spinn_utilities.log import FormatAdapter
from spinnman.exceptions import (
    SpinnmanIOException, SpinnmanInvalidPacketException,
    SpinnmanTimeoutException)
from spinnman.messages.eieio.command_messages import EIEIOCommandHeader
from spinnman.connections.udp_packet_connections import UDPConnection
from spinnman.constants import EIEIO_COMMAND_IDS as CMDS
from spinn_front_end_common.utilities.constants import NOTIFY_PORT
from .database_reader import DatabaseReader

logger = FormatAdapter(logging.getLogger(__name__))


[docs]class DatabaseConnection(UDPConnection): """ A connection from the toolchain which will be notified when the \ database has been written, and can then respond when the database \ has been read, and further wait for notification that the simulation \ has started. .. note:: The machine description database reader can only be used while the registered database callbacks are running. """ __slots__ = [ "__database_callbacks", "__pause_and_stop_callback", "__running", "__start_resume_callback"] def __init__(self, start_resume_callback_function=None, stop_pause_callback_function=None, local_host=None, local_port=NOTIFY_PORT): """ :param callable start_resume_callback_function: A function to be called when the start message has been received. This function should not take any parameters or return anything. :param str local_host: Optional specification of the local hostname or IP address of the interface to listen on :param int local_port: Optional specification of the local port to listen on. Must match the port that the toolchain will send the notification on (19999 by default) """ super().__init__( local_host=local_host, local_port=local_port, remote_host=None, remote_port=None) thread = Thread(name="SpyNNakerDatabaseConnection:{}:{}".format( self.local_ip_address, self.local_port), target=self.__run) self.__database_callbacks = list() self.__start_resume_callback = start_resume_callback_function self.__pause_and_stop_callback = stop_pause_callback_function self.__running = False thread.daemon = True thread.start()
[docs] def add_database_callback(self, database_callback_function): """ Add a database callback to be called when the database is ready. :param callable(DatabaseReader,None) database_callback_function: A function to be called when the database message has been received. This function should take a single parameter, which will be a DatabaseReader object. Once the function returns, it will be assumed that the database has been read and will not be needed further, and the return response will be sent. """ self.__database_callbacks.append(database_callback_function)
def __run(self): # pylint: disable=broad-except self.__running = True logger.info( "{}:{} Waiting for message to indicate that the database is " "ready", self.local_ip_address, self.local_port) try: while self.__running: try: data, address = self.receive_with_address(timeout=3) except SpinnmanTimeoutException: continue self.__read_db(address, data) # Wait for the start of the simulation if self.__start_resume_callback is not None: self.__start_resume() # Wait for the end of the simulation if self.__pause_and_stop_callback is not None: self.__pause_stop() except Exception as e: logger.error("Failure processing database callback", exc_info=True) raise SpinnmanIOException(str(e)) from e finally: self.__running = False def __read_db(self, address, data): # Read the read packet confirmation logger.info("{}:{} Reading database", self.local_ip_address, self.local_port) if len(data) > 2: database_path = data[2:].decode('utf-8') logger.info("database is at {}", database_path) # Call the callback with DatabaseReader(database_path) as db_reader: for db_callback in self.__database_callbacks: db_callback(db_reader) else: logger.warning("Database path was empty - assuming no database") # Send the response logger.info("Notifying the toolchain that the database has been read") self._send_command(CMDS.DATABASE_CONFIRMATION, address) def __start_resume(self): logger.info( "Waiting for message to indicate that the simulation has " "started or resumed") command_code = self._receive_command() if command_code != CMDS.START_RESUME_NOTIFICATION.value: raise SpinnmanInvalidPacketException( "command_code", "expected a start/resume command code now, and did not " "receive it") # Call the callback self.__start_resume_callback() def __pause_stop(self): logger.info( "Waiting for message to indicate that the simulation has " "stopped or paused") command_code = self._receive_command() if command_code != CMDS.STOP_PAUSE_NOTIFICATION.value: raise SpinnmanInvalidPacketException( "command_code", "expected a pause/stop command code now, and did not " "receive it") # Call the callback self.__pause_and_stop_callback() def _send_command(self, command, address): self.send_to(EIEIOCommandHeader(command.value).bytestring, address) def _receive_command(self): return EIEIOCommandHeader.from_bytestring(self.receive(), 0).command
[docs] def close(self): self.__running = False super().close()