Raspberry Pi Cluster Node – 05 Talking to nodes with JSON

This post builds on my previous posts in the Raspberry Pi Cluster series by changing the format of the data I send. In this tutorial I am now sending data as JSON to allow a richer set of messages to be sent.

Why use JSON to send data

In previous tutorials I was sending raw data to the client and printing out the output. However there was no way to tell the end of one message from the start of another. There was also no predefined format for the messages I would send so it couldn’t be parsed by any program.

One of the advantages of using JSON is that the format is widely used in different applications and can express rich objects in a simple data structure. Python has the easy to use json module which can encode and decode json into primitive python data structures.

If I want to have other languages talk to my python cluster the only requirements would be that it can speak sockets and send JSON. This is something that the majority of languages can do which will not close our options.

There are some more advanced ways to perform remote operations using libraries such as RPyC which could be used. However this handles a lot of the complexity internally and restricts us to purely using python. To be able to fully explore some of the interesting problems of distributed computing I want to have to deal with some of these issues ourselves.

So instead of sending raw data I am going to start sending JSON encoded data that can read and understood by our cluster.

The DataPackager file

I have created the DataPackager file in our module to hold all the functions used to format and retrieve messages for the cluster.

The first issue I needed to address was how to tell the difference between the end of one message and the beginning of the next. Since I have decided to use JSON to send the data I can easily use a newline character to signify the end of a message.

JSON can be encoded onto a single line with no line breaks and is still valid JSON. This means I don’t need to worry about newlines in the JSON object causing issues. So at the end of each full message sent from a slave I will add a carriage return (\r). This is what I will use to determine if I have reached the end of a message.

This means the code on the master retrieving the message only needs to keep reading until it has reached a carriage return. If I receive a long string of messages I can easily split on this carriage return and process each message.

This makes the code to create a message relatively simple:

MESSAGE_SEPARATOR = "\r"
def create_msg_payload(message):
    return json.dumps({'msg': message}) + MESSAGE_SEPARATOR

Here currently I have a simple function which creates a dictionary containing the message to send to the client. This is then json serialized using json.dumps which turns an object into a JSON string. The final piece of code then adds the message separator which is the carriage return.

Receiving the messages on the master is a little more complex but is it performed using the following code:

_buffered_string = ""
_buffered_messages = []

First we initialize two variables, the first to hold the buffered string we haven’t yet processed and the second to hold any messages we haven’t processed.

This is because we will handle one message at a time and may need to hold many messages while we deal with others.

Inside the module we have some helper methods, one of them is to check if there is a message in the buffer:

def _get_message_in_buffer():
    global _buffered_messages
    if len(_buffered_messages) > 0:
        return json.loads(_buffered_messages.pop(0))
    else:
        return None

Here we check to see if we have any messages in the buffer. If we do we remove the first item in the list using pop(0) and decode the JSON using json.loads. Then we return this newly decoded JSON object. Note that using pop(0) on a list isn’t very efficient but I will talk about that in a later tutorial when looking at performance.

Another helper method I have created is used to check the currently buffered string for any messages.

def _check_buffer_for_messages():
    global _buffered_string, _buffered_messages
    split_buffered_data = _buffered_string.split(MESSAGE_SEPARATOR)
    if len(split_buffered_data) > 1:  # If we find more than one item, there is a message
        messages_to_process = split_buffered_data[0:-1]
        for message in messages_to_process:
            _buffered_messages.append(message)

        _buffered_string = split_buffered_data[-1]

Here I am first splitting the buffered data string by our chosen message separator (carriage return). If the split returns more than one element it means we have at least one message to parse.

Using list slicing I can select every element but the last and store it in the messages_to_process variable using split_buffered_data[0:-1]. This then lets me append each of these messages to the buffered messages variable.

Once I have stored all of the newly parsed messages I set the buffered string to the final element of the array which is any additional data which we haven’t yet been able to convert into a full message.

I use both of these functions in the publicly available get_message function that my master will call.

def get_message(clientsocket):
    global _buffered_string, _buffered_messages

    message_in_buffer = _get_message_in_buffer()
    if message_in_buffer:
        return message_in_buffer

    while True:
        try:
            data = clientsocket.recv(512) #Get at max 512 bytes of data from the client
        except socket.error: #If we failed to get data, assume they have disconnected
            return None
        data_len = len(data)
        if data_len > 0: #Do something if we got data
            _buffered_string += data #Keep track of our buffered stored data

            _check_buffer_for_messages()
            message_in_buffer = _get_message_in_buffer()
            if message_in_buffer:
                return message_in_buffer
        else:
            return None

The first thing I do here is to check if there are any messages already in the buffer. If there are then I return the first one using my helper method _get_message_in_buffer. This is because I don’t want to receive any further messages until I have dealt with the previous ones.

Once there are no more messages I start a while loop to keep getting data from the slave. I call the blocking call recv to receive some data from the slave. If new data is received then it is addedthis to the buffered string variable. Once again I check to see if there are any new messages in the buffer, If I find any then I return the first one.

If there was a socket exception recieving the data or I recieve data with length 0 I know the socket has went away. In this case I return None so that the script calling this knows there will be no more messages.

The script calling this would always expect a parsed JSON message or None in the event there are no more messages to handle. Once the method has returned None it is expected that the method will not be called again.

Changes to the master and slave

Now I have my DataPackager module I can change my master and slave to use the new methods. The slave changes very little as instead of sending raw data it sends it packaged up by the new module.

logger.info("Sending an initial hello to master")
sock.send(create_msg_payload("Hello World, I am client {num}".format(num=client_number)))
while True:
    time.sleep(5)
    sock.send(create_msg_payload("I am still alive, client: {num}".format(num=client_number)))

Here you can see that the only change is that I am using the create_msg_payload function to format the data.

The master has also changed slightly however now I am handling socket errors the master will be more robust.

message = True
while message:
    message = get_message(clientsocket)
    if message:
        logger.info("Received message: " + message['msg'])
    else:
        logger.info("Client disconnected")

Here I have a while loop that continues until I get a message with the value of None indicating the socket has been closed. I loop through calling get_message each time passing in our client socket to see if there is any data.

Summary

In this tutorial I have shown how you can send messages that can easily be parsed by a computer. In the future I am going to send more complex messages and have the master handle them differently.

This modulized code allows for a simple message passing system to be created reducing the overall complexity ny using the DataPackager shared module I have written.

The full code is available on Github, any comments or questions can be raised there as issues or posted below.

One Comment

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.