Building a CPU monitor

Now that we have a grasp on the main reactive programming concepts, we can implement a sample application. In this subsection, we will implement a monitor that will give us real-time information about our CPU usage and is capable of detecting spikes.

The complete code for the CPU monitor can be found in the cpu_monitor.py file.

As a first step, let's implement a data source. We will use the psutil module that provides a function, psutil.cpu_percent, that returns the latest available CPU usage as a percent (and doesn't block):

    import psutil
psutil.cpu_percent()
# Result: 9.7

Since we are developing a monitor, we would like to sample this information over a few time intervals. To accomplish this we can use the familiar Observable.interval , followed by map just like we did in the previous section. Also, we would like to make this observable hot as, for this application, all subscribers should receive a single source of data; to make Observable.interval hot,  we can use the publish and connect methods. The full code for the creation of the cpu_data observable is as follows

    cpu_data = (Observable
.interval(100) # Each 100 milliseconds
.map(lambda x: psutil.cpu_percent())
.publish())
cpu_data.connect() # Start producing data

We can test our monitor by printing a sample of 4 items

    cpu_data.take(4).subscribe(print)
# Output:
# 12.5
# 5.6
# 4.5
# 9.6

Now that our main data source is in place, we can implement a monitor visualization using matplotlib. The idea is to create a plot that contains a fixed amount of measurements and, as new data arrives, we include the newest measurement and remove the oldest one. This is commonly referred to as a moving window and is better understood with an illustration. In the following figure, our cpu_data stream is represented as a list of numbers. The first plot is produced as soon as we have the first four numbers and, each time a new number arrives, we shift the window by one position and update the plot:

To implement this algorithm, we can write a function, called monitor_cpu, that will create and update our plotting window. The function will do the following things:

  • Initialize an empty plot and set up the correct plot limits.
  • Transform our cpu_data observable to return a moving window over the data. This can be accomplished using the buffer_with_count operator, which will take the number of points in our window, npoints, as parameters and the shift as 1.
  • Subscribe to this new data stream and update the plot with the incoming data.

The complete code for the function is shown here and, as you can see, is extremely compact. Take some time to run the function and play with the parameters:

    import numpy as np
from matplotlib import pyplot as plt

def monitor_cpu(npoints):
lines, = plt.plot([], [])
plt.xlim(0, npoints)
plt.ylim(0, 100) # 0 to 100 percent

cpu_data_window = cpu_data.buffer_with_count(npoints, 1)

def update_plot(cpu_readings):
lines.set_xdata(np.arange(npoints))
lines.set_ydata(np.array(cpu_readings))
plt.draw()

cpu_data_window.subscribe(update_plot)

plt.show()

Another feature we may want to develop is, for example, an alert that triggers when the CPU has been high for a certain amount of time as this may indicate that some of the  processes in our machine are working very hard. This can be accomplished by combining buffer_with_count and map. We can take the CPU stream and a window, and then we will test whether all items have a value higher than twenty percent usage (in a quad-core CPU that corresponds to about one processor working at hundred percent) in the map function. If all the points in the window have a higher than twenty percent usage, we display a warning in our plot window.

The implementation of the new observable can be written as follows and will produce an observable that emits True if the CPU has high usage, and False otherwise:

    alertpoints = 4    
high_cpu = (cpu_data
.buffer_with_count(alertpoints, 1)
.map(lambda readings: all(r > 20 for r in readings)))

Now that the high_cpu observable is ready, we can create a matplotlib label and subscribe to it for updates:

    label = plt.text(1, 1, "normal")
def update_warning(is_high):
if is_high:
label.set_text("high")
else:
label.set_text("normal")
high_cpu.subscribe(update_warning)
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.12.123.189