Raspberry Pi Cluster Node – 09 Multi Slave Master

This post builds on my previous posts in the Raspberry Pi Cluster series by changing the master so that it accepts multiple slaves connecting to it.

Creating a thread to handle each client

Typically to handle multiple operations occurring at once in a program, you will use additional threads or processes. These run at the same time as your initial “main” thread and work very similarly to it.

In this tutorial I am going to be changing my master to create one thread per client, and handle the client wholly in the thread.

While using threading there is the possibility for bugs to appear due to the multi-threaded nature of code. This initial implementation will not use any complex features of threads, so we will largely ignore these problems.

This post doesn’t go into the details of how the GIL (Global Interpreter Lock) can slow down multi-threaded programs in python. This is a much more advanced topic and will be addressed in the future. For more information you can read the Global Interpreter Lock page on the python Wiki.

Creating a thread handler in python

In python one of the easiest ways to run threads in python is to create a class that inherits from threading.Thread.

At a basic level, you can create a run method inside your class. This method is called once the thread is created and has been requested to start running.

To start the thread and perform your processing, you need to instantiate the class. Once created, calling the start method will call your run method in the newly created thread. At the point the program in your main thread will continue running at the same time as the child thread.

For my system I will have the main thread handle connecting to the slaves. Once the connection is created it will be passed to another thread to process.

Below is the example of my class to handle the threads.

class RpiClusterClient(threading.Thread):

    def __init__(self, clientsocket, address):
        threading.Thread.__init__(self)
        self.clientsocket = clientsocket
        self.address = address

    def run(self):
        message = True
        while message:
            message = get_message(self.clientsocket)
            if message:
                if message['type'] == 'message':
                    logger.info("Received message: " + message['payload'])
                elif message['type'] == 'computer_details':
                    logger.info("Computer specifications: " + json.dumps(message['payload']))
                elif message['type'] == 'info':
                    logger.info("Slave wants to know my info about " + message['payload'])
                    if message['payload'] == 'computer_details':
                        self.clientsocket.send(create_payload(get_base_machine_info(), "master_info"))
                    else:
                        self.clientsocket.send(create_payload("unknown", "bad_message"))

            else:
                logger.info("Client disconnected at address {addr}".format(addr=self.address))

Here I create my class, RpiClusterClient, which is inheriting from the threading.Thread class so that I can run processing in this child thread.

The __init__ method is used to store data about the slave, the client socket and the address. The client socket is used to communicate with the connected slave so must be passed to the new thread.

The run method is moved from the original code created in the basic_master.py file in previous tutorials. Since this was previously discussed I wont go over what this is doing.

Improving the master to create threads

Now there is a class to handle the thread processing we can improve the master to use it.

Instead of listening to a single connection, I have set it to listen to up to 10. For a larger system this number may need to be increased.

socket.listen(10) #listen to 10 connects
while True:
    (clientsocket, address) = socket.accept()
    logger.info("Got client at {address}".format(address=address))

    rpiClient = RpiClusterClient(clientsocket, address)
    rpiClient.start()

Since I am connecting to more than one slave the socket.accept() call is within a while True loop to run forever.

Inside this loop we accept the connection, then create an instance of the RpiClusterClient. Once created start() is called on it to initialise the thread and begin running the run() method.

Once start is called the thread handles the connection to the slave. At this point the master does not talk to the slave in the main thread. All the communication is handled in the child threads.

Summary of thread handling improvements

This tutorial focuses on improving the master so that it may talk to many different slaves. This now means the cluster is able to start distributing tasks to many nodes at once.

Currently there is no error handling for nodes connecting and disconnecting. The next tutorial will focus on hardening the master and slaves connection/disconnection code.

The full code is available on Github, any comments or questions can be raised there as issues or posted below.

One Comment

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.