Raspberry Pi Cluster Node – 17 InfluxDB Machine Stats Monitoring

This post builds on my previous posts in the Raspberry Pi Cluster series by performing a small refactor and storing cluster vitals data to InfluxDB.

Refactoring the codebase a little

To start with I will refactor the codebase a little so it is more split up. By introducing a few new package layers it should be easier to work with.

First we are going to refactor the code a little to move all classes that deal with secondary nodes into their own package. In addition the primaries will also be moved into their own area. These creates two new package locations:

RaspberryPiCluster/PrimaryNodes
RaspberryPiCluster/SecondaryNodes

Once that has been done I am tweaking the naming of some functions and files. The MachineInfo file has been renamed to NodeVitals. This is because the file allows the “vital information” about a node to be collected.

Inside this the get_base_machine_info has been renamed to get_node_baseinfo. This function is now designed to return “base information” about the node which wont change often. This would include information such as RAM, CPU, and SWAP size.

The temporal data (data that changes based on time) has been removed from this function and added to a new function current_node_vitals. This is designed to be called to obtain the “current” state of the node and any changing “vital information”.

In addition to this, the method now returns an object VitalsPayload which is designed to hold information about the current vitals. By abstracting this data into an object it can have helper methods to load and format the payload to send to the primary.

Reporting current node vitals for secondary nodes

Now we have refactored the node vital methods it can be used to report data to the primary. This will be helpful as the primary will then be able to determine how loaded each node is. This could be important when deciding what work should be scheduled on what node.

Once the secondary has connected to the master it currently enters a loop where it will call perform_action repeatedly. In the new code below it will check if its vitals have been sent in the last 60 seconds. If it hasn’t then it will first report its vitals to the primary and then perform some action.

vitals_last_sent = 0
while True:
	if (vitals_last_sent + 60) < time.time():
		logger.info("Sending vitals to primary node")
		current_vitals = get_current_node_vitals()
		self.connection_handler.send_message(current_vitals.get_flat_payload(), "vitals")
		vitals_last_sent = time.time()
	self.perform_action()

When the primary begins to distribute work it will be able to use this information to determine what should run.

At the moment if perform_action takes a long time this will not be updated very frequently so I will look at making this a second thread in the future.

Storing Vital Data in InfluxDb

With the above changes the secondary is now regularly sending its vital data to the primary. We are going to store this data using InfluxDB so we can see it over time.

To send data to InfluxDB I am using the official python library influxdb. This allows connecting to, reading, and writing data into an Influx Database. Currently as the primary is the only node writing to Influx I will only install the Influx library on that node.

To handle the connection to InfluxDB I have created a class hold the information.

import datetime
from influxdb import InfluxDBClient


class RpiInfluxClient:
    """ Simple helper class which will hold the details of the RPI Influx Client and make it a little easier to use """

    def __init__(self, influxdb_host, influxdb_port, influxdb_database_prefix):

        self.influxdb_host = influxdb_host
        self.influxdb_port = influxdb_port
        self.influxdb_database_prefix = influxdb_database_prefix
        self.node_vitals_database_name = self.influxdb_database_prefix + "node_vitals"
        self.node_name = None
        # We only connect once we are ready to connect
        self.influx_client = None

    def add_node_name(self, node_name):
        self.node_name = node_name

    def connect(self):

        """ Connects to the influx db Database using their client and sets up the databases needed (if needed)"""
        self.influx_client = InfluxDBClient(self.influxdb_host, self.influxdb_port)
        self.influx_client.create_database(self.node_vitals_database_name)
        self.influx_client.create_retention_policy("node_vitals_year_rp", '365d', 3, database=self.node_vitals_database_name, default=True)

This is given all the information needed to create a connection to talk to the Influx database. The connect method is used to begin the connection and set up the data structures in Influx. This configures a simple retention policy for the data in addition to the database to store the data.

There are also two methods which are used to write data into InfluxDB. The first is a private method to write a datapoint to InfluxDB. This will be used to keep a standard format when writing data to InfluxDB.

The second is a function to take vitals and store it to InfluxDB using the first. This splits apart the Vitals datastructure and sends the data one by one.

def _write_datapoint(self, measurement, values):
	if self.node_name is None:
		# We shouldnt ever encounter this but if we do I want it to fail hard so we can debug (not a good idea for production though)
		raise Exception("Cannot write node value without node name")

	points_to_write = [
		{
			"measurement": measurement,
			"tags": {
				"node": self.node_name
			},
			"time": datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ"),
			"fields": values
		}
	]

	self.influx_client.write_points(points_to_write, database=self.node_vitals_database_name)

def log_vitals(self, vitals):
	# TODO: Write these in one write_points API call rather than lots of smaller ones
	self._write_datapoint("cpu", {
		"frequency": vitals.cpu_frequency,
		"percentage": vitals.cpu_percentage
	})
	self._write_datapoint("ram", {
		"free": vitals.ram_free
	})
	self._write_datapoint("swap", {"free": vitals.swap_free})

Since the Python InfluxDB client allows for writing multiple values at once a future improvement is to make a single call to write_points. I have left a note here in the comments to do this.

Over time as more data is added into the vitals object, this will log more data if available. However currently this is just logging the basic data in the object.

After these changes the primary is aware of the current node vitals of all connected nodes, and is storing this for tracking purposes.

Summary of adding node monitoring

After some refactoring the secondary nodes now all report their data to the primary. This is logged by the primary into an InfluxDB which can be viewed to monitor the status of the cluster.

Going forward more information will be added to keep an eye on the nodes.

The full code is available on Github, any comments or questions can be raised there as issues or posted below.

One Comment

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.