Raspberry Pi Cluster Node – 13 Abstracting Slave Code
This post builds on my previous posts in the Raspberry Pi Cluster series by abstracting the slave code so it is ready for more complex slaves.
Why I am abstracting the Slave code
As the system becomes more complex there will be a number of slaves performing different tasks.
Currently our basic slave reports a heartbeat to let the master know its still alive. When it joins the cluster it sends over data about the node. Once this has been sent it then receives cluster information about the master. Finally, the slave will begin repeatedly sending heartbeat data.
For all slaves we want to ensure that they follow this standard pattern. All of the reconnection code, and sending of the node details needs to be uniform across all slaves.
To do this I am going to abstract the code that performs this into a single class. The only difference between the slaves will be what it does once it has connected to the master.
For the basic slave we will continue to send the heartbeat, but for the more complex slaves they will perform additional steps.
In addition to this I am planning to run the connection code in a separate thread. This will mean that while each slave is communicating with the master it is able to perform other tasks.
Abstracting the Slave code
Changing the slave code to use a thread will work similar to how we handle each slave connecting to the master. A class is created whose parent is threading.Thread
and when used it is created and then started in a new thread.
In this case we are going to call it the RpiBasicSlaveThread
class.
class RpiBasicSlaveThread(threading.Thread):
def __init__(self, master_ip, socket_port):
threading.Thread.__init__(self)
self.client_number = random.randint(1, 100000)
self.server_address = (master_ip, socket_port)
self.sock = None
In the init this takes the master ip, and the socket port which will be used to create the socket connection. In addition to this, a random client number is generated to use used as means of identification.
As in previous code, the client number could conflict with another slave, so going forward this will be improved to ensure no client has the same ID.
Here the run()
method is called once the thread has been started after calling start()
.
def run(self):
logger.info("Starting script...")
while True:
logger.info("Connecting to the master...")
connected = False
while connected is False:
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect(self.server_address)
connected = True
except socket.error as e:
logger.info("Failed to connect to master, waiting 60 seconds and trying again")
time.sleep(60)
logger.info("Successfully connected to the master")
try:
logger.info("Sending an initial hello to master")
send_message(self.sock, create_payload(get_base_machine_info(), 'computer_details'))
send_message(self.sock, create_payload("computer_details", "info"))
message = get_message(self.sock)
logger.info("We have information about the master " + json.dumps(message['payload']))
while True:
self.perform_action()
except DisconnectionException as e:
logger.info("Got disconnection exception with message: " + e.message)
logger.info("Slave will try and reconnect once master is back online")
This run method is primarily the contents of the original basic slave script. Here it will attempt to connect to the master until it suceeds. This also has disconnection code handling an automatic reconnect if the master is offline for a period of time.
The one change that has been made here is to instead of perform the heartbeat code, it calls self.perform_action()
.
def perform_action(self):
logger.info("Now sending a keepalive to the master")
send_message(self.sock, create_payload("I am still alive, client: {num}".format(num=self.client_number)))
time.sleep(5)
Here this method performs the basic heartbeat we had in the basic slave script. However since this is a method it can easily be overwritten from a child class.
Going forward new slaves will be a subclass of this class and override this perform_action()
method. This means that all the slaves will have the same connection code and the handling for disconnection.
The benefit of this is that as we improve the connection handling code, all slaves will benefit from this code.
Reviewing the basic_slave.py script
Now after the majority of this code has been abstracted the basic_slave script is now significantly shorter.
config = ConfigParser.ConfigParser()
config.read(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'rpicluster.cfg'))
socket_port = config.getint("slave", "socket_port")
master_ip = config.get("slave", "master_ip")
add_file_logger("slave.log")
basic_slave_thread = RpiBasicSlaveThread(master_ip, socket_port)
basic_slave_thread.start()
The main change now involves creating the RpiBasicSlaveThread
object, and then starting it running in a new thread with start()
. The rest of the logic that was previously here is now handled now lives in the newly created class.
Summary of changes
Here we have moved the majority of the slave code into a new class. This has been refactored slightly so that we may start to implement new slaves.
The full code is available on Github, any comments or questions can be raised there as issues or posted below.