Reading streaming data sources

What if the data that is coming from the source is continuous? What if we need to read continuous data? This recipe will demonstrate a simple solution that will work for many common real-life scenarios, but it is not universal and you will need to modify it if you hit a special case in your application.

How to do it...

In this recipe, we will show you how to read an ever-changing file and print the output. We will use the common Python module to accomplish this as shown here:

import time
import os
import sys

if len(sys.argv) != 2:
    print >> sys.stderr, "Please specify filename to read"

filename = sys.argv[1]

if not os.path.isfile(filename):
    print >> sys.stderr, "Given file: "%s" is not a file" % filename

with open(filename,'r') as f:
    # Move to the end of file
    filesize = os.stat(filename)[6]
    f.seek(filesize)

    # endlessly loop
    while True:
        where = f.tell()
        # try reading a line
        line = f.readline()
        # if empty, go back
        if not line:
            time.sleep(1)
            f.seek(where)
        else:
            # , at the end prevents print to add newline, as readline()
            # already read that.
            print line,

How it works...

The core of the code is inside the while True: loop. This loop never stops (unless we interrupt it by pressing Ctrl + C on our keyboard). We first move to the end of the file we are reading and then we try to read a line. If there is no line, that means nothing was added to the file after we checked it using seek(). So, we sleep for one second and then try again.

If there is a non-empty line, we print that out and suppress the new line character.

There's more...

We might want to read the last n lines. We could do that by going almost to the end of the file. We could go there by looking for the file, that is, with file.seek(filesize – N * avg_line_len). Here, avg_line_len should be the approximation of average line length in that file (approximately 1,024). Then, we could use readlines() from that point to read line and then print just [-N] lines from that list.

The idea from this example can be used for various solutions. For example, the input has to be a file-like object or a remote HTTP-accessible resource. Thus, one can read the input from a remote service and continuously parse it and update live charts or update the intermediate queue, buffer, or database.

One particular module is very useful for stream handling—io. It is in Python from Version 2.6, is built as a replacement for the file module, and is a default interface in Python 3.x.

In some more complex data pipelines, we will need to enable some sort of message queues, where our incoming continuous data will have to be queued for some time before we are able to accept it. This enables us, as consumers of the data, to be able to pause processing if we are overloaded. Having data on the common message bus enables other clients on the project to consume the same data and not interfere with our software.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.117.77.54